Skip to main content

mz_adapter/
client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::BTreeMap;
12use std::fmt::{Debug, Display, Formatter};
13use std::future::Future;
14use std::pin::{self, Pin};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use anyhow::bail;
19use chrono::{DateTime, Utc};
20use derivative::Derivative;
21use futures::{Stream, StreamExt};
22use itertools::Itertools;
23use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
24use mz_auth::password::Password;
25use mz_auth::{Authenticated, AuthenticatorKind};
26use mz_build_info::BuildInfo;
27use mz_compute_types::ComputeInstanceId;
28use mz_ore::channel::OneshotReceiverExt;
29use mz_ore::collections::CollectionExt;
30use mz_ore::id_gen::{IdAllocator, IdAllocatorInnerBitSet, MAX_ORG_ID, org_id_conn_bits};
31use mz_ore::instrument;
32use mz_ore::now::{EpochMillis, NowFn, to_datetime};
33use mz_ore::str::StrExt;
34use mz_ore::task::AbortOnDropHandle;
35use mz_ore::thread::JoinOnDropHandle;
36use mz_ore::tracing::OpenTelemetryContext;
37use mz_repr::user::InternalUserMetadata;
38use mz_repr::{CatalogItemId, ColumnIndex, SqlScalarType};
39use mz_sql::ast::{Raw, Statement};
40use mz_sql::catalog::{EnvironmentId, SessionCatalog};
41use mz_sql::session::hint::ApplicationNameHint;
42use mz_sql::session::metadata::SessionMetadata;
43use mz_sql::session::user::SUPPORT_USER;
44use mz_sql::session::vars::{
45    CLUSTER, ENABLE_FRONTEND_PEEK_SEQUENCING, OwnedVarInput, SystemVars, Var,
46};
47use mz_sql_parser::parser::{ParserStatementError, StatementParseResult};
48use prometheus::Histogram;
49use serde_json::json;
50use tokio::sync::{mpsc, oneshot};
51use tracing::{debug, error};
52use uuid::Uuid;
53
54use crate::catalog::Catalog;
55use crate::command::{
56    CatalogDump, CatalogSnapshot, Command, CopyFromStdinWriter, ExecuteResponse, Response,
57    SASLChallengeResponse, SASLVerifyProofResponse, SuperuserAttribute,
58};
59use crate::config::{ScopedParameters, ScopedParametersScope, SystemParameterFrontend};
60use crate::coord::{Coordinator, ExecuteContextGuard};
61use crate::error::AdapterError;
62use crate::metrics::{self, Metrics};
63use crate::session::{
64    EndTransactionAction, PreparedStatement, Session, SessionConfig, StateRevision, TransactionId,
65};
66use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
67use crate::telemetry::{self, EventDetails, SegmentClientExt, StatementFailureType};
68use crate::webhook::AppendWebhookResponse;
69use crate::{AdapterNotice, AppendWebhookError, PeekClient, PeekResponseUnary, StartupResponse};
70
71/// A handle to a running coordinator.
72///
73/// The coordinator runs on its own thread. Dropping the handle will wait for
74/// the coordinator's thread to exit, which will only occur after all
75/// outstanding [`Client`]s for the coordinator have dropped.
76pub struct Handle {
77    pub(crate) session_id: Uuid,
78    pub(crate) start_instant: Instant,
79    pub(crate) _thread: JoinOnDropHandle<()>,
80}
81
82impl Handle {
83    /// Returns the session ID associated with this coordinator.
84    ///
85    /// The session ID is generated on coordinator boot. It lasts for the
86    /// lifetime of the coordinator. Restarting the coordinator will result
87    /// in a new session ID.
88    pub fn session_id(&self) -> Uuid {
89        self.session_id
90    }
91
92    /// Returns the instant at which the coordinator booted.
93    pub fn start_instant(&self) -> Instant {
94        self.start_instant
95    }
96}
97
98/// A coordinator client.
99///
100/// A coordinator client is a simple handle to a communication channel with the
101/// coordinator. It can be cheaply cloned.
102///
103/// Clients keep the coordinator alive. The coordinator will not exit until all
104/// outstanding clients have dropped.
105#[derive(Debug, Clone)]
106pub struct Client {
107    build_info: &'static BuildInfo,
108    inner_cmd_tx: mpsc::UnboundedSender<(OpenTelemetryContext, Command)>,
109    id_alloc: IdAllocator<IdAllocatorInnerBitSet>,
110    now: NowFn,
111    metrics: Metrics,
112    environment_id: EnvironmentId,
113    segment_client: Option<mz_segment::Client>,
114}
115
116impl Client {
117    pub(crate) fn new(
118        build_info: &'static BuildInfo,
119        cmd_tx: mpsc::UnboundedSender<(OpenTelemetryContext, Command)>,
120        metrics: Metrics,
121        now: NowFn,
122        environment_id: EnvironmentId,
123        segment_client: Option<mz_segment::Client>,
124    ) -> Client {
125        // Connection ids are 32 bits and have 3 parts.
126        // 1. MSB bit is always 0 because these are interpreted as an i32, and it is possible some
127        //    driver will not handle a negative id since postgres has never produced one because it
128        //    uses process ids.
129        // 2. Next 12 bits are the lower 12 bits of the org id. This allows balancerd to route
130        //    incoming cancel messages to a subset of the environments.
131        // 3. Last 19 bits are random.
132        let env_lower = org_id_conn_bits(&environment_id.organization_id());
133        Client {
134            build_info,
135            inner_cmd_tx: cmd_tx,
136            id_alloc: IdAllocator::new(1, MAX_ORG_ID, env_lower),
137            now,
138            metrics,
139            environment_id,
140            segment_client,
141        }
142    }
143
144    /// Allocates a client for an incoming connection.
145    pub fn new_conn_id(&self) -> Result<ConnectionId, AdapterError> {
146        self.id_alloc.alloc().ok_or(AdapterError::IdExhaustionError)
147    }
148
149    /// Creates a new session associated with this client for the given user.
150    ///
151    /// It is the caller's responsibility to have authenticated the user.
152    /// We pass in an Authenticated marker as a guardrail to ensure the
153    /// user has authenticated with an authenticator before creating a session.
154    pub fn new_session(&self, config: SessionConfig, _authenticated: Authenticated) -> Session {
155        // We use the system clock to determine when a session connected to Materialize. This is not
156        // intended to be 100% accurate and correct, so we don't burden the timestamp oracle with
157        // generating a more correct timestamp.
158        Session::new(self.build_info, config, self.metrics().session_metrics())
159    }
160
161    /// Used by [mz_auth::AuthenticatorKind::Password]
162    /// to verify the provided user's password against the
163    /// stored credentials in the catalog.
164    pub async fn authenticate(
165        &self,
166        user: &String,
167        password: &Password,
168    ) -> Result<Authenticated, AdapterError> {
169        let (tx, rx) = oneshot::channel();
170        self.send(Command::AuthenticatePassword {
171            role_name: user.to_string(),
172            password: Some(password.clone()),
173            tx,
174        });
175        rx.await.expect("sender dropped")?;
176        Ok(Authenticated)
177    }
178
179    /// Used by [mz_auth::AuthenticatorKind::Sasl] for SASL-SCRAM authentication.
180    /// This is used prior to [Client::verify_sasl_proof].
181    pub async fn generate_sasl_challenge(
182        &self,
183        user: &String,
184        client_nonce: &String,
185    ) -> Result<SASLChallengeResponse, AdapterError> {
186        let (tx, rx) = oneshot::channel();
187        self.send(Command::AuthenticateGetSASLChallenge {
188            role_name: user.to_string(),
189            nonce: client_nonce.to_string(),
190            tx,
191        });
192        let response = rx.await.expect("sender dropped")?;
193        Ok(response)
194    }
195
196    /// Used by [mz_auth::AuthenticatorKind::Sasl] for SASL-SCRAM authentication.
197    /// This is used after [Client::generate_sasl_challenge].
198    pub async fn verify_sasl_proof(
199        &self,
200        user: &String,
201        proof: &String,
202        nonce: &String,
203        mock_hash: &String,
204    ) -> Result<(SASLVerifyProofResponse, Authenticated), AdapterError> {
205        let (tx, rx) = oneshot::channel();
206        self.send(Command::AuthenticateVerifySASLProof {
207            role_name: user.to_string(),
208            proof: proof.to_string(),
209            auth_message: nonce.to_string(),
210            mock_hash: mock_hash.to_string(),
211            tx,
212        });
213        let response = rx.await.expect("sender dropped")?;
214        Ok((response, Authenticated))
215    }
216
217    /// Checks if a role exists and has the `LOGIN` attribute.
218    pub async fn role_can_login(&self, role_name: &str) -> Result<(), AdapterError> {
219        let (tx, rx) = oneshot::channel();
220        self.send(Command::CheckRoleCanLogin {
221            role_name: role_name.to_string(),
222            tx,
223        });
224        rx.await.expect("sender dropped")
225    }
226
227    /// Upgrades this client to a session client.
228    ///
229    /// A session is a connection that has successfully negotiated parameters,
230    /// like the user. Most coordinator operations are available only after
231    /// upgrading a connection to a session.
232    ///
233    /// Returns a new client that is bound to the session and a response
234    /// containing various details about the startup.
235    #[mz_ore::instrument(level = "debug")]
236    pub async fn startup(&self, session: Session) -> Result<SessionClient, AdapterError> {
237        let user = session.user().clone();
238        let conn_id = session.conn_id().clone();
239        let secret_key = session.secret_key();
240        let uuid = session.uuid();
241        let client_ip = session.client_ip();
242        let application_name = session.application_name().into();
243        let notice_tx = session.retain_notice_transmitter();
244
245        let (tx, rx) = oneshot::channel();
246
247        // ~~SPOOKY ZONE~~
248        //
249        // This guard prevents a race where the startup command finishes, but the Future returned
250        // by this function is concurrently dropped, so we never create a `SessionClient` and thus
251        // never cleanup the initialized Session.
252        let rx = rx.with_guard(|_| {
253            self.send(Command::Terminate {
254                conn_id: conn_id.clone(),
255                tx: None,
256            });
257        });
258
259        self.send(Command::Startup {
260            tx,
261            user,
262            conn_id: conn_id.clone(),
263            secret_key,
264            uuid,
265            client_ip: client_ip.copied(),
266            application_name,
267            notice_tx,
268        });
269
270        // When startup fails, no need to call terminate (handle_startup does this). Delay creating
271        // the client until after startup to sidestep the panic in its `Drop` implementation.
272        let response = rx.await.expect("sender dropped")?;
273
274        // Create the client as soon as startup succeeds (before any await points) so its `Drop` can
275        // handle termination.
276        // Build the PeekClient with controller handles returned from startup.
277        let StartupResponse {
278            role_id,
279            write_notify,
280            session_defaults,
281            catalog,
282            storage_collections,
283            transient_id_gen,
284            optimizer_metrics,
285            persist_client,
286            statement_logging_frontend,
287            superuser_attribute,
288        } = response;
289
290        let peek_client = PeekClient::new(
291            self.clone(),
292            storage_collections,
293            transient_id_gen,
294            optimizer_metrics,
295            persist_client,
296            statement_logging_frontend,
297        );
298
299        let mut client = SessionClient {
300            inner: Some(self.clone()),
301            session: Some(session),
302            timeouts: Timeout::new(),
303            environment_id: self.environment_id.clone(),
304            segment_client: self.segment_client.clone(),
305            peek_client,
306            enable_frontend_peek_sequencing: false, // initialized below, once we have a ConnCatalog
307        };
308
309        let session = client.session();
310
311        // Apply the superuser attribute to the session's user if
312        // it exists.
313        if let SuperuserAttribute(Some(superuser)) = superuser_attribute {
314            session.apply_internal_user_metadata(InternalUserMetadata { superuser });
315        }
316
317        session.initialize_role_metadata(role_id);
318        let vars_mut = session.vars_mut();
319        for (name, val) in session_defaults {
320            if let Err(err) = vars_mut.set_default(&name, val.borrow()) {
321                // Note: erroring here is unexpected, but we don't want to panic if somehow our
322                // assumptions are wrong.
323                tracing::error!("failed to set peristed default, {err:?}");
324            }
325        }
326        session
327            .vars_mut()
328            .end_transaction(EndTransactionAction::Commit);
329
330        // Stash the future that notifies us of builtin table writes completing, we'll block on
331        // this future before allowing queries from this session against relevant relations.
332        //
333        // Note: We stash the future as opposed to waiting on it here to prevent blocking session
334        // creation on builtin table updates. This improves the latency for session creation and
335        // reduces scheduling load on any dataflows that read from these builtin relations, since
336        // it allows updates to be batched.
337        session.set_builtin_table_updates(write_notify);
338
339        let catalog = catalog.for_session(session);
340
341        let cluster_active = session.vars().cluster().to_string();
342        if session.vars().welcome_message() {
343            let cluster_info = if catalog.resolve_cluster(Some(&cluster_active)).is_err() {
344                format!("{cluster_active} (does not exist)")
345            } else {
346                cluster_active.to_string()
347            };
348
349            // Emit a welcome message, optimized for readability by humans using
350            // interactive tools. If you change the message, make sure that it
351            // formats nicely in both `psql` and the console's SQL shell.
352            session.add_notice(AdapterNotice::Welcome(format!(
353                "connected to Materialize v{}
354  Environment ID: {}
355  Region: {}
356  User: {}
357  Cluster: {}
358  Database: {}
359  {}
360  Session UUID: {}
361
362Issue a SQL query to get started. Need help?
363  View documentation: https://materialize.com/s/docs
364  Join our Slack community: https://materialize.com/s/chat
365    ",
366                session.vars().build_info().semver_version(),
367                self.environment_id,
368                self.environment_id.region(),
369                session.vars().user().name,
370                cluster_info,
371                session.vars().database(),
372                match session.vars().search_path() {
373                    [schema] => format!("Schema: {}", schema),
374                    schemas => format!(
375                        "Search path: {}",
376                        schemas.iter().map(|id| id.to_string()).join(", ")
377                    ),
378                },
379                session.uuid(),
380            )));
381        }
382
383        if session.vars().current_object_missing_warnings() {
384            if catalog.active_database().is_none() {
385                let db = session.vars().database().into();
386                session.add_notice(AdapterNotice::UnknownSessionDatabase(db));
387            }
388        }
389
390        // Users stub their toe on their default cluster not existing, so we provide a notice to
391        // help guide them on what do to.
392        let cluster_var = session
393            .vars()
394            .inspect(CLUSTER.name())
395            .expect("cluster should exist");
396        if session.vars().current_object_missing_warnings()
397            && catalog.resolve_cluster(Some(&cluster_active)).is_err()
398        {
399            let cluster_notice = 'notice: {
400                if cluster_var.inspect_session_value().is_some() {
401                    break 'notice Some(AdapterNotice::DefaultClusterDoesNotExist {
402                        name: cluster_active,
403                        kind: "session",
404                        suggested_action: "Pick an extant cluster with SET CLUSTER = name. Run SHOW CLUSTERS to see available clusters.".into(),
405                    });
406                }
407
408                let role_default = catalog.get_role(catalog.active_role_id());
409                let role_cluster = match role_default.vars().get(CLUSTER.name()) {
410                    Some(OwnedVarInput::Flat(name)) => Some(name),
411                    None => None,
412                    // This is unexpected!
413                    Some(v @ OwnedVarInput::SqlSet(_)) => {
414                        tracing::warn!(?v, "SqlSet found for cluster Role Default");
415                        break 'notice None;
416                    }
417                };
418
419                let alter_role = "with `ALTER ROLE <role> SET cluster TO <cluster>;`";
420                match role_cluster {
421                    // If there is no default, suggest a Role default.
422                    None => Some(AdapterNotice::DefaultClusterDoesNotExist {
423                        name: cluster_active,
424                        kind: "system",
425                        suggested_action: format!(
426                            "Set a default cluster for the current role {alter_role}."
427                        ),
428                    }),
429                    // If the default does not exist, suggest to change it.
430                    Some(_) => Some(AdapterNotice::DefaultClusterDoesNotExist {
431                        name: cluster_active,
432                        kind: "role",
433                        suggested_action: format!(
434                            "Change the default cluster for the current role {alter_role}."
435                        ),
436                    }),
437                }
438            };
439
440            if let Some(notice) = cluster_notice {
441                session.add_notice(notice);
442            }
443        }
444
445        client.enable_frontend_peek_sequencing = ENABLE_FRONTEND_PEEK_SEQUENCING
446            .require(catalog.system_vars())
447            .is_ok();
448
449        Ok(client)
450    }
451
452    /// Cancels the query currently running on the specified connection.
453    pub fn cancel_request(&self, conn_id: ConnectionIdType, secret_key: u32) {
454        self.send(Command::CancelRequest {
455            conn_id,
456            secret_key,
457        });
458    }
459
460    /// Executes a single SQL statement that returns rows as the
461    /// `mz_support` user.
462    pub async fn support_execute_one(
463        &self,
464        sql: &str,
465    ) -> Result<Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send>>, anyhow::Error> {
466        // Connect to the coordinator.
467        let conn_id = self.new_conn_id()?;
468        let session = self.new_session(
469            SessionConfig {
470                conn_id,
471                uuid: Uuid::new_v4(),
472                user: SUPPORT_USER.name.clone(),
473                client_ip: None,
474                external_metadata_rx: None,
475                helm_chart_version: None,
476                authenticator_kind: AuthenticatorKind::None,
477                groups: None,
478            },
479            Authenticated,
480        );
481        let mut session_client = self.startup(session).await?;
482
483        // Parse the SQL statement.
484        let stmts = mz_sql::parse::parse(sql)?;
485        if stmts.len() != 1 {
486            bail!("must supply exactly one query");
487        }
488        let StatementParseResult { ast: stmt, sql } = stmts.into_element();
489
490        const EMPTY_PORTAL: &str = "";
491        session_client.start_transaction(Some(1))?;
492        session_client
493            .declare(EMPTY_PORTAL.into(), stmt, sql.to_string())
494            .await?;
495
496        let execute_result = session_client
497            .execute(EMPTY_PORTAL.into(), futures::future::pending(), None)
498            .await?;
499        match execute_result {
500            (ExecuteResponse::SendingRowsStreaming { mut rows, .. }, _) => {
501                // We have to only drop the session client _after_ we read the
502                // result. Otherwise the peek will get cancelled right when we
503                // drop the session client. So we wrap it up in an extra stream
504                // like this, which owns the client and can return it.
505                let owning_response_stream = async_stream::stream! {
506                    while let Some(rows) = rows.next().await {
507                        yield rows;
508                    }
509                    drop(session_client);
510                };
511                Ok(Box::pin(owning_response_stream))
512            }
513            r => bail!("unsupported response type: {r:?}"),
514        }
515    }
516
517    /// Returns the metrics associated with the adapter layer.
518    pub fn metrics(&self) -> &Metrics {
519        &self.metrics
520    }
521
522    /// The current time according to the [`Client`].
523    pub fn now(&self) -> DateTime<Utc> {
524        to_datetime((self.now)())
525    }
526
527    /// Get a metadata and a channel that can be used to append to a webhook source.
528    pub async fn get_webhook_appender(
529        &self,
530        database: String,
531        schema: String,
532        name: String,
533    ) -> Result<AppendWebhookResponse, AppendWebhookError> {
534        let (tx, rx) = oneshot::channel();
535
536        // Send our request.
537        self.send(Command::GetWebhook {
538            database,
539            schema,
540            name,
541            tx,
542        });
543
544        // Using our one shot channel to get the result, returning an error if the sender dropped.
545        let response = rx
546            .await
547            .map_err(|_| anyhow::anyhow!("failed to receive webhook response"))?;
548
549        response
550    }
551
552    /// Gets the current value of all system variables.
553    pub async fn get_system_vars(&self) -> SystemVars {
554        let (tx, rx) = oneshot::channel();
555        self.send(Command::GetSystemVars { tx });
556        rx.await.expect("coordinator unexpectedly gone")
557    }
558
559    /// Returns a snapshot of the catalog.
560    pub async fn catalog_snapshot(&self) -> Arc<Catalog> {
561        let (tx, rx) = oneshot::channel();
562        self.send(Command::CatalogSnapshot { tx });
563        let CatalogSnapshot { catalog } = rx.await.expect("coordinator unexpectedly gone");
564        catalog
565    }
566
567    /// Reconciles the coordinator's scoped feature-flag working copy towards
568    /// `overrides`. Used by the system-parameter sync loop from continuous
569    /// LaunchDarkly evaluation.
570    ///
571    /// `prune_scope` bounds which objects' rows the reconcile may remove (the
572    /// objects `overrides` was evaluated for). The sync loop passes the live
573    /// objects from its snapshot; `None` is a full replace, used by the
574    /// disabled-feature clear path. See
575    /// [`crate::catalog::Op::UpdateScopedSystemParameters`].
576    pub async fn update_scoped_system_parameters(
577        &self,
578        overrides: ScopedParameters,
579        prune_scope: Option<ScopedParametersScope>,
580    ) {
581        let (tx, rx) = oneshot::channel();
582        self.send(Command::UpdateScopedSystemParameters {
583            overrides,
584            prune_scope,
585            tx,
586        });
587        let _ = rx.await;
588    }
589
590    /// Installs (or replaces) the shared system-parameter frontend on the
591    /// coordinator, letting the create-cluster / create-replica paths resolve a
592    /// new object's scoped overrides synchronously. Sent by the sync loop each
593    /// time it (re)initializes the frontend. Fire-and-forget.
594    pub fn install_scoped_system_parameter_frontend(&self, frontend: Arc<SystemParameterFrontend>) {
595        self.send(Command::InstallScopedSystemParameterFrontend { frontend });
596    }
597
598    #[instrument(level = "debug")]
599    pub(crate) fn send(&self, cmd: Command) {
600        self.inner_cmd_tx
601            .send((OpenTelemetryContext::obtain(), cmd))
602            .expect("coordinator unexpectedly gone");
603    }
604}
605
606/// A coordinator client that is bound to a connection.
607///
608/// See also [`Client`].
609pub struct SessionClient {
610    // Invariant: inner may only be `None` after the session has been terminated.
611    // Once the session is terminated, no communication to the Coordinator
612    // should be attempted.
613    inner: Option<Client>,
614    // Invariant: session may only be `None` during a method call. Every public
615    // method must ensure that `Session` is `Some` before it returns.
616    session: Option<Session>,
617    timeouts: Timeout,
618    segment_client: Option<mz_segment::Client>,
619    environment_id: EnvironmentId,
620    /// Client for frontend peek sequencing; populated at connection startup.
621    peek_client: PeekClient,
622    /// Whether frontend peek sequencing is enabled; initialized at connection startup.
623    // TODO(peek-seq): Currently, this is initialized only at session startup. We'll be able to
624    // check the actual feature flag value at every peek (without a Coordinator call) once we'll
625    // always have a catalog snapshot at hand.
626    pub enable_frontend_peek_sequencing: bool,
627}
628
629impl SessionClient {
630    /// Parses a SQL expression, reporting failures as a telemetry event if
631    /// possible.
632    pub fn parse<'a>(
633        &self,
634        sql: &'a str,
635    ) -> Result<Result<Vec<StatementParseResult<'a>>, ParserStatementError>, String> {
636        match mz_sql::parse::parse_with_limit(sql) {
637            Ok(Err(e)) => {
638                self.track_statement_parse_failure(&e);
639                Ok(Err(e))
640            }
641            r => r,
642        }
643    }
644
645    fn track_statement_parse_failure(&self, parse_error: &ParserStatementError) {
646        let session = self.session.as_ref().expect("session invariant violated");
647        let Some(user_id) = session.user().external_metadata.as_ref().map(|m| m.user_id) else {
648            return;
649        };
650        let Some(segment_client) = &self.segment_client else {
651            return;
652        };
653        let Some(statement_kind) = parse_error.statement else {
654            return;
655        };
656        let Some((action, object_type)) = telemetry::analyze_audited_statement(statement_kind)
657        else {
658            return;
659        };
660        let event_type = StatementFailureType::ParseFailure;
661        let event_name = format!(
662            "{} {} {}",
663            object_type.as_title_case(),
664            action.as_title_case(),
665            event_type.as_title_case(),
666        );
667        segment_client.environment_track(
668            &self.environment_id,
669            event_name,
670            json!({
671                "statement_kind": statement_kind,
672                "error": &parse_error.error,
673            }),
674            EventDetails {
675                user_id: Some(user_id),
676                application_name: Some(session.application_name()),
677                ..Default::default()
678            },
679        );
680    }
681
682    // Verify and return the named prepared statement. We need to verify each use
683    // to make sure the prepared statement is still safe to use.
684    pub async fn get_prepared_statement(
685        &mut self,
686        name: &str,
687    ) -> Result<&PreparedStatement, AdapterError> {
688        let catalog = self.catalog_snapshot("get_prepared_statement").await;
689        Coordinator::verify_prepared_statement(&catalog, self.session(), name)?;
690        Ok(self
691            .session()
692            .get_prepared_statement_unverified(name)
693            .expect("must exist"))
694    }
695
696    /// Saves the parsed statement as a prepared statement.
697    ///
698    /// The prepared statement is saved in the connection's [`crate::session::Session`]
699    /// under the specified name.
700    pub async fn prepare(
701        &mut self,
702        name: String,
703        stmt: Option<Statement<Raw>>,
704        sql: String,
705        param_types: Vec<Option<SqlScalarType>>,
706    ) -> Result<(), AdapterError> {
707        let catalog = self.catalog_snapshot("prepare").await;
708
709        // Note: This failpoint is used to simulate a request outliving the external connection
710        // that made it.
711        let mut async_pause = false;
712        (|| {
713            fail::fail_point!("async_prepare", |val| {
714                async_pause = val.map_or(false, |val| val.parse().unwrap_or(false))
715            });
716        })();
717        if async_pause {
718            tokio::time::sleep(Duration::from_secs(1)).await;
719        };
720
721        let desc = Coordinator::describe(&catalog, self.session(), stmt.clone(), param_types)?;
722        let now = self.now();
723        let state_revision = StateRevision {
724            catalog_revision: catalog.transient_revision(),
725            session_state_revision: self.session().state_revision(),
726        };
727        self.session()
728            .set_prepared_statement(name, stmt, sql, desc, state_revision, now);
729        Ok(())
730    }
731
732    /// Binds a statement to a portal.
733    #[mz_ore::instrument(level = "debug")]
734    pub async fn declare(
735        &mut self,
736        name: String,
737        stmt: Statement<Raw>,
738        sql: String,
739    ) -> Result<(), AdapterError> {
740        let catalog = self.catalog_snapshot("declare").await;
741        let param_types = vec![];
742        let desc =
743            Coordinator::describe(&catalog, self.session(), Some(stmt.clone()), param_types)?;
744        let params = vec![];
745        let result_formats = vec![mz_pgwire_common::Format::Text; desc.arity()];
746        let now = self.now();
747        let logging = self.session().mint_logging(sql, Some(&stmt), now);
748        let state_revision = StateRevision {
749            catalog_revision: catalog.transient_revision(),
750            session_state_revision: self.session().state_revision(),
751        };
752        self.session().set_portal(
753            name,
754            desc,
755            Some(stmt),
756            logging,
757            params,
758            result_formats,
759            state_revision,
760        )?;
761        Ok(())
762    }
763
764    /// Executes a previously-bound portal.
765    ///
766    /// Note: the provided `cancel_future` must be cancel-safe as it's polled in a `select!` loop.
767    ///
768    /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
769    /// triggering the execution of the underlying query.
770    #[mz_ore::instrument(level = "debug")]
771    pub async fn execute(
772        &mut self,
773        portal_name: String,
774        cancel_future: impl Future<Output = std::io::Error> + Send,
775        outer_ctx_extra: Option<ExecuteContextGuard>,
776    ) -> Result<(ExecuteResponse, Instant), AdapterError> {
777        let execute_started = Instant::now();
778
779        let mut outer_ctx_extra = outer_ctx_extra;
780
781        // Unroll SQL `EXECUTE <prepared> (...)` so the inner statement
782        // flows through `try_frontend_peek` below, rather than being
783        // re-dispatched via `Command::Execute` from the coordinator's
784        // `Plan::Execute` handler. Without this, a prepared statement
785        // would route differently from the same statement issued
786        // directly.
787        //
788        // On a successful unroll, `unroll_sql_execute` also returns a
789        // catalog snapshot (threaded through to avoid taking a second
790        // one) and begins EXECUTE-level statement logging on the outer
791        // portal — so `mz_statement_execution_history` records
792        // `EXECUTE foo (...)`, not the inner SQL — installing the
793        // resulting `ExecuteContextGuard` into `outer_ctx_extra`.
794        let (portal_name, catalog) = self
795            .unroll_sql_execute(portal_name, &mut outer_ctx_extra)
796            .await?;
797
798        // Attempt peek sequencing in the session task.
799        // If unsupported, fall back to the Coordinator path.
800        // TODO(peek-seq): wire up cancel_future
801        let peek_result = self
802            .try_frontend_peek(&portal_name, catalog, &mut outer_ctx_extra)
803            .await?;
804        if let Some(resp) = peek_result {
805            debug!("frontend peek succeeded");
806            // Frontend peek handled the execution and retired outer_ctx_extra if it existed.
807            // No additional work needed here.
808            return Ok((resp, execute_started));
809        } else {
810            debug!("frontend peek did not happen, falling back to `Command::Execute`");
811            // If we bailed out, outer_ctx_extra is still present (if it was originally).
812            // `Command::Execute` will handle it.
813            // (This is not true if we bailed out _after_ the frontend peek sequencing has already
814            // begun its own statement logging. That case would be a bug.)
815        }
816
817        let response = self
818            .send_with_cancel(
819                |tx, session| Command::Execute {
820                    portal_name,
821                    session,
822                    tx,
823                    outer_ctx_extra,
824                },
825                cancel_future,
826            )
827            .await?;
828        Ok((response, execute_started))
829    }
830
831    /// If the named portal binds a SQL `EXECUTE <prepared>`, resolve the
832    /// prepared statement, install a fresh portal for the inner statement
833    /// (carrying the EXECUTE's actual parameter values), and return that
834    /// portal's name so the caller can run `try_frontend_peek` against it.
835    ///
836    /// Only ever unrolls one level: the parser rejects
837    /// `PREPARE foo AS EXECUTE bar` (matching Postgres), so the inner
838    /// statement is guaranteed not to be another `EXECUTE`. A failsafe below
839    /// surfaces an internal error if that invariant is ever violated.
840    ///
841    /// When the portal does not bind an `EXECUTE` — the common case —
842    /// returns the original portal name and `None`, costing only a portal
843    /// lookup. On unroll, also returns the catalog snapshot taken here so
844    /// the caller can thread it into `try_frontend_peek`, which reuses it
845    /// instead of taking its own.
846    async fn unroll_sql_execute(
847        &mut self,
848        portal_name: String,
849        outer_ctx_extra: &mut Option<ExecuteContextGuard>,
850    ) -> Result<(String, Option<Arc<Catalog>>), AdapterError> {
851        let (stmt, params, outer_logging, outer_lifecycle_timestamps) = {
852            let session = self.session.as_ref().expect("SessionClient invariant");
853            let portal = match session.get_portal_unverified(&portal_name) {
854                Some(p) => p,
855                // No portal: let `try_frontend_peek` surface the
856                // standard "missing portal" error.
857                None => return Ok((portal_name, None)),
858            };
859            match &portal.stmt {
860                Some(stmt) => (
861                    Arc::clone(stmt),
862                    portal.parameters.clone(),
863                    Arc::clone(&portal.logging),
864                    portal.lifecycle_timestamps.clone(),
865                ),
866                None => return Ok((portal_name, None)),
867            }
868        };
869
870        // Only EXECUTE statements need unrolling. Bail out before taking a
871        // catalog snapshot in the (overwhelmingly common) non-EXECUTE case.
872        if !matches!(&*stmt, Statement::Execute(_)) {
873            return Ok((portal_name, None));
874        }
875
876        let catalog = self.catalog_snapshot("unroll_sql_execute").await;
877
878        // Validate the outer EXECUTE portal against the (possibly newer)
879        // catalog: ensures the recorded portal description still matches
880        // what describing the EXECUTE would produce now.
881        {
882            let session = self.session.as_mut().expect("SessionClient invariant");
883            Coordinator::verify_portal(&catalog, session, &portal_name)?;
884        }
885
886        // Bump query_total for the outer EXECUTE itself. The inner
887        // statement gets its own increment inside `try_frontend_peek_inner`
888        // (or, on bailout, in the coordinator's `handle_execute`).
889        {
890            let session = self.session.as_ref().expect("SessionClient invariant");
891            session
892                .metrics()
893                .query_total(&[
894                    metrics::session_type_label_value(session.user()),
895                    metrics::statement_type_label_value(&stmt),
896                ])
897                .inc();
898        }
899
900        // Begin EXECUTE-level statement logging up front, so that planning
901        // errors below produce an `Errored` end-event in
902        // `mz_statement_execution_history` rather than no entry at all.
903        //
904        // We pass the *outer* portal's `logging` and pgwire-bound `params`
905        // so the recorded entry shows the user-visible `EXECUTE foo (...)`,
906        // not the inner SQL. The id (if any) moves into `outer_ctx_extra`
907        // below for `try_frontend_peek` to retire; on planning error we
908        // explicitly emit an `Errored` end-event below.
909        let began_outer_logging = outer_ctx_extra.is_none();
910        let logging_id: Option<crate::statement_logging::StatementLoggingId> =
911            if began_outer_logging {
912                let session = self.session.as_mut().expect("SessionClient invariant");
913                let result = self
914                    .peek_client
915                    .statement_logging_frontend
916                    .begin_statement_execution(
917                        session,
918                        &params,
919                        &outer_logging,
920                        catalog.system_config(),
921                        outer_lifecycle_timestamps,
922                    );
923                if let Some((id, began_execution, mseh_update, prepared_statement)) = result {
924                    self.peek_client.log_began_execution(
925                        began_execution,
926                        mseh_update,
927                        prepared_statement,
928                    );
929                    Some(id)
930                } else {
931                    None
932                }
933            } else {
934                None
935            };
936
937        let new_portal_name = match self.install_inner_portal_for_execute(&catalog, &stmt, &params)
938        {
939            Ok(name) => name,
940            Err(err) => {
941                if let Some(id) = logging_id {
942                    self.peek_client.log_ended_execution(
943                        id,
944                        StatementEndedExecutionReason::Errored {
945                            error: err.to_string(),
946                        },
947                    );
948                }
949                return Err(err);
950            }
951        };
952
953        // Hand off to `outer_ctx_extra` whenever we entered the begin path
954        // for the outer EXECUTE — even if `begin_statement_execution`
955        // returned `None` (sampling decided not to sample, or logging is
956        // disabled for the user). This mirrors the original coord path,
957        // which always installs a guard via
958        // `ExecuteContextGuard::new(maybe_uuid, ...)`. Without this, the
959        // inner portal would be treated as a fresh statement by
960        // `try_frontend_peek` (or the fallback `Command::Execute` path)
961        // and re-account its bytes against
962        // `mz_statement_logging_unsampled_bytes`, double-counting the
963        // inner SQL.
964        if began_outer_logging {
965            // Soft invariant: `try_frontend_peek` takes ownership of
966            // `outer_ctx_extra` immediately, so this guard's `Drop` is
967            // unreachable on the normal flow and the dummy channel is
968            // never used. If a panic does fire `Drop` between here and
969            // that takeover, the `Aborted` end-event is silently lost
970            // — an acceptable trade given the panic implies the
971            // connection is going down anyway.
972            let (dummy_tx, _dummy_rx) = mpsc::unbounded_channel();
973            *outer_ctx_extra = Some(ExecuteContextGuard::new(logging_id, dummy_tx));
974        }
975
976        Ok((new_portal_name, Some(catalog)))
977    }
978
979    /// Helper for [`Self::unroll_sql_execute`]: plans the outer
980    /// `Statement::Execute`, verifies the referenced prepared statement, and
981    /// installs a fresh portal carrying the inner statement plus the
982    /// EXECUTE's bound parameter values. Returns the new portal's name.
983    ///
984    /// Split out so [`Self::unroll_sql_execute`] can wrap the fallible work
985    /// in a single error-handling site that emits an `Errored` end-event
986    /// for the EXECUTE-level statement-logging entry.
987    fn install_inner_portal_for_execute(
988        &mut self,
989        catalog: &Arc<Catalog>,
990        stmt: &Arc<Statement<Raw>>,
991        params: &mz_sql::plan::Params,
992    ) -> Result<String, AdapterError> {
993        use mz_sql::plan::Plan;
994
995        let execute_plan = {
996            let session = self.session.as_mut().expect("SessionClient invariant");
997            let conn_catalog = catalog.for_session(session);
998            let (resolved_stmt, resolved_ids) =
999                mz_sql::names::resolve(&conn_catalog, (**stmt).clone())?;
1000            let pcx = session.pcx();
1001            let (plan, _sql_impl_ids) = mz_sql::plan::plan(
1002                Some(pcx),
1003                &conn_catalog,
1004                resolved_stmt,
1005                params,
1006                &resolved_ids,
1007            )?;
1008            match plan {
1009                Plan::Execute(plan) => plan,
1010                other => {
1011                    // Planning a `Statement::Execute` must yield
1012                    // `Plan::Execute`. If it doesn't, the planner
1013                    // contract is broken.
1014                    return Err(AdapterError::Internal(format!(
1015                        "planning Statement::Execute yielded unexpected plan: {:?}",
1016                        mz_sql::plan::PlanKind::from(&other),
1017                    )));
1018                }
1019            }
1020        };
1021
1022        // Verify and install the inner portal. Mirrors
1023        // `Coordinator::sequence_execute`. The new portal carries the inner
1024        // prepared statement's `logging`, but `try_frontend_peek` will see
1025        // `outer_ctx_extra=Some(...)` and inherit the EXECUTE-level logging
1026        // instead of starting fresh from this portal.
1027        let session = self.session.as_mut().expect("SessionClient invariant");
1028        Coordinator::verify_prepared_statement(catalog, session, &execute_plan.name)?;
1029        let ps = session
1030            .get_prepared_statement_unverified(&execute_plan.name)
1031            .expect("verified above");
1032        let inner_stmt = ps.stmt().cloned();
1033        let inner_desc = ps.desc().clone();
1034        let state_revision = ps.state_revision;
1035        let inner_logging = Arc::clone(ps.logging());
1036
1037        // Failsafe: `PREPARE foo AS EXECUTE bar` is rejected by the parser,
1038        // so the resolved inner statement must not be another `EXECUTE`. If
1039        // that ever changes, we'd silently skip frontend sequencing for the
1040        // deeper EXECUTEs — surface it as an internal error instead.
1041        if let Some(inner) = inner_stmt.as_ref() {
1042            if matches!(inner, Statement::Execute(_)) {
1043                return Err(AdapterError::Internal(format!(
1044                    "nested EXECUTE: prepared statement {} resolves to another EXECUTE; \
1045                     parser should reject `PREPARE ... AS EXECUTE ...`",
1046                    execute_plan.name.quoted(),
1047                )));
1048            }
1049        }
1050
1051        session.create_new_portal(
1052            inner_stmt,
1053            inner_logging,
1054            inner_desc,
1055            execute_plan.params,
1056            Vec::new(),
1057            state_revision,
1058        )
1059    }
1060
1061    fn now(&self) -> EpochMillis {
1062        (self.inner().now)()
1063    }
1064
1065    fn now_datetime(&self) -> DateTime<Utc> {
1066        to_datetime(self.now())
1067    }
1068
1069    /// Starts a transaction based on implicit:
1070    /// - `None`: InTransaction
1071    /// - `Some(1)`: Started
1072    /// - `Some(n > 1)`: InTransactionImplicit
1073    /// - `Some(0)`: no change
1074    pub fn start_transaction(&mut self, implicit: Option<usize>) -> Result<(), AdapterError> {
1075        let now = self.now_datetime();
1076        let session = self.session.as_mut().expect("session invariant violated");
1077        let result = match implicit {
1078            None => session.start_transaction(now, None, None),
1079            Some(stmts) => {
1080                session.start_transaction_implicit(now, stmts);
1081                Ok(())
1082            }
1083        };
1084        result
1085    }
1086
1087    /// Ends a transaction. Even if an error is returned, guarantees that the transaction in the
1088    /// session and Coordinator has cleared its state.
1089    #[instrument(level = "debug")]
1090    pub async fn end_transaction(
1091        &mut self,
1092        action: EndTransactionAction,
1093    ) -> Result<ExecuteResponse, AdapterError> {
1094        let res = self
1095            .send(|tx, session| Command::Commit {
1096                action,
1097                session,
1098                tx,
1099            })
1100            .await;
1101        // Commit isn't guaranteed to set the session's state to anything specific, so clear it
1102        // here. It's safe to ignore the returned `TransactionStatus` because that doesn't contain
1103        // any data that the Coordinator must act on for correctness.
1104        let _ = self.session().clear_transaction();
1105        res
1106    }
1107
1108    /// Fails a transaction.
1109    pub fn fail_transaction(&mut self) {
1110        let session = self.session.take().expect("session invariant violated");
1111        let session = session.fail_transaction();
1112        self.session = Some(session);
1113    }
1114
1115    /// Fetches the catalog.
1116    #[instrument(level = "debug")]
1117    pub async fn catalog_snapshot(&self, context: &str) -> Arc<Catalog> {
1118        let start = std::time::Instant::now();
1119        let CatalogSnapshot { catalog } = self
1120            .send_without_session(|tx| Command::CatalogSnapshot { tx })
1121            .await;
1122        self.inner()
1123            .metrics()
1124            .catalog_snapshot_seconds
1125            .with_label_values(&[context])
1126            .observe(start.elapsed().as_secs_f64());
1127        catalog
1128    }
1129
1130    /// Dumps the catalog to a JSON string.
1131    ///
1132    /// No authorization is performed, so access to this function must be limited to internal
1133    /// servers or superusers.
1134    pub async fn dump_catalog(&self) -> Result<CatalogDump, AdapterError> {
1135        let catalog = self.catalog_snapshot("dump_catalog").await;
1136        catalog.dump().map_err(AdapterError::from)
1137    }
1138
1139    /// Checks the catalog for internal consistency, returning a JSON object describing the
1140    /// inconsistencies, if there are any.
1141    ///
1142    /// No authorization is performed, so access to this function must be limited to internal
1143    /// servers or superusers.
1144    pub async fn check_catalog(&self) -> Result<(), serde_json::Value> {
1145        let catalog = self.catalog_snapshot("check_catalog").await;
1146        catalog.check_consistency()
1147    }
1148
1149    /// Checks the coordinator for internal consistency, returning a JSON object describing the
1150    /// inconsistencies, if there are any. This is a superset of checks that check_catalog performs,
1151    ///
1152    /// No authorization is performed, so access to this function must be limited to internal
1153    /// servers or superusers.
1154    pub async fn check_coordinator(&self) -> Result<(), serde_json::Value> {
1155        self.send_without_session(|tx| Command::CheckConsistency { tx })
1156            .await
1157            .map_err(|inconsistencies| {
1158                serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1159                    serde_json::Value::String("failed to serialize inconsistencies".to_string())
1160                })
1161            })
1162    }
1163
1164    pub async fn dump_coordinator_state(&self) -> Result<serde_json::Value, anyhow::Error> {
1165        self.send_without_session(|tx| Command::Dump { tx }).await
1166    }
1167
1168    /// Tells the coordinator a statement has finished execution, in the cases
1169    /// where we have no other reason to communicate with the coordinator.
1170    pub fn retire_execute(
1171        &self,
1172        guard: ExecuteContextGuard,
1173        reason: StatementEndedExecutionReason,
1174    ) {
1175        if !guard.is_trivial() {
1176            let data = guard.defuse();
1177            let cmd = Command::RetireExecute { data, reason };
1178            self.inner().send(cmd);
1179        }
1180    }
1181
1182    /// Sets up a streaming COPY FROM STDIN operation.
1183    ///
1184    /// Sends a command to the coordinator to create a background batch
1185    /// builder task. Returns a [`CopyFromStdinWriter`] that pgwire uses
1186    /// to stream decoded rows.
1187    pub async fn start_copy_from_stdin(
1188        &mut self,
1189        target_id: CatalogItemId,
1190        target_name: String,
1191        columns: Vec<ColumnIndex>,
1192        row_desc: mz_repr::RelationDesc,
1193        params: mz_pgcopy::CopyFormatParams<'static>,
1194    ) -> Result<CopyFromStdinWriter, AdapterError> {
1195        self.send(|tx, session| Command::StartCopyFromStdin {
1196            target_id,
1197            target_name,
1198            columns,
1199            row_desc,
1200            params,
1201            session,
1202            tx,
1203        })
1204        .await
1205    }
1206
1207    /// Commits staged COPY FROM STDIN batches to a table.
1208    ///
1209    /// Adds the pre-built persist batches to the session's transaction
1210    /// operations. The actual commit happens when the transaction ends.
1211    pub fn stage_copy_from_stdin_batches(
1212        &mut self,
1213        target_id: CatalogItemId,
1214        batches: Vec<mz_persist_client::batch::ProtoBatch>,
1215    ) -> Result<(), AdapterError> {
1216        use crate::session::{TransactionOps, WriteOp};
1217        use mz_storage_client::client::TableData;
1218
1219        self.session()
1220            .add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
1221                id: target_id,
1222                rows: TableData::Batches(batches.into()),
1223            }]))?;
1224        Ok(())
1225    }
1226
1227    /// Gets the current value of all system variables.
1228    pub async fn get_system_vars(&self) -> SystemVars {
1229        self.inner().get_system_vars().await
1230    }
1231
1232    /// Updates the specified system variables to the specified values.
1233    pub async fn set_system_vars(
1234        &mut self,
1235        vars: BTreeMap<String, String>,
1236    ) -> Result<(), AdapterError> {
1237        let conn_id = self.session().conn_id().clone();
1238        self.send_without_session(|tx| Command::SetSystemVars { vars, conn_id, tx })
1239            .await
1240    }
1241
1242    /// Injects audit events into the catalog via the coordinator.
1243    ///
1244    /// No authorization is performed, so access to this function must be limited to internal
1245    /// servers or superusers.
1246    pub async fn inject_audit_events(
1247        &mut self,
1248        events: Vec<crate::catalog::InjectedAuditEvent>,
1249    ) -> Result<(), AdapterError> {
1250        let conn_id = self.session().conn_id().clone();
1251        self.send_without_session(|tx| Command::InjectAuditEvents {
1252            events,
1253            conn_id,
1254            tx,
1255        })
1256        .await
1257    }
1258
1259    /// Terminates the client session.
1260    pub async fn terminate(&mut self) {
1261        let conn_id = self.session().conn_id().clone();
1262        let res = self
1263            .send_without_session(|tx| Command::Terminate {
1264                conn_id,
1265                tx: Some(tx),
1266            })
1267            .await;
1268        if let Err(e) = res {
1269            // Nothing we can do to handle a failed terminate so we just log and ignore it.
1270            error!("Unable to terminate session: {e:?}");
1271        }
1272        // Prevent any communication with Coordinator after session is terminated.
1273        self.inner = None;
1274    }
1275
1276    /// Returns a mutable reference to the session bound to this client.
1277    pub fn session(&mut self) -> &mut Session {
1278        self.session.as_mut().expect("session invariant violated")
1279    }
1280
1281    /// Returns a reference to the inner client.
1282    pub fn inner(&self) -> &Client {
1283        self.inner.as_ref().expect("inner invariant violated")
1284    }
1285
1286    async fn send_without_session<T, F>(&self, f: F) -> T
1287    where
1288        F: FnOnce(oneshot::Sender<T>) -> Command,
1289    {
1290        let (tx, rx) = oneshot::channel();
1291        self.inner().send(f(tx));
1292        rx.await.expect("sender dropped")
1293    }
1294
1295    #[instrument(level = "debug")]
1296    async fn send<T, F>(&mut self, f: F) -> Result<T, AdapterError>
1297    where
1298        F: FnOnce(oneshot::Sender<Response<T>>, Session) -> Command,
1299    {
1300        self.send_with_cancel(f, futures::future::pending()).await
1301    }
1302
1303    /// Send a [`Command`] to the Coordinator, with the ability to cancel the command.
1304    ///
1305    /// Note: the provided `cancel_future` must be cancel-safe as it's polled in a `select!` loop.
1306    #[instrument(level = "debug")]
1307    async fn send_with_cancel<T, F>(
1308        &mut self,
1309        f: F,
1310        cancel_future: impl Future<Output = std::io::Error> + Send,
1311    ) -> Result<T, AdapterError>
1312    where
1313        F: FnOnce(oneshot::Sender<Response<T>>, Session) -> Command,
1314    {
1315        let session = self.session.take().expect("session invariant violated");
1316        let mut typ = None;
1317        let application_name = session.application_name();
1318        let name_hint = ApplicationNameHint::from_str(application_name);
1319        let conn_id = session.conn_id().clone();
1320        let (tx, rx) = oneshot::channel();
1321
1322        // Destructure self so we can hold a mutable reference to the inner client and session at
1323        // the same time.
1324        let Self {
1325            inner: inner_client,
1326            session: client_session,
1327            ..
1328        } = self;
1329
1330        // TODO(parkmycar): Leaking this invariant here doesn't feel great, but calling
1331        // `self.client()` doesn't work because then Rust takes a borrow on the entirity of self.
1332        let inner_client = inner_client.as_ref().expect("inner invariant violated");
1333
1334        // ~~SPOOKY ZONE~~
1335        //
1336        // This guard prevents a race where a `Session` is returned on `rx` but never placed
1337        // back in `self` because the Future returned by this function is concurrently dropped
1338        // with the Coordinator sending a response.
1339        let mut guarded_rx = rx.with_guard(|response: Response<_>| {
1340            *client_session = Some(response.session);
1341        });
1342
1343        inner_client.send({
1344            let cmd = f(tx, session);
1345            // Measure the success and error rate of certain commands:
1346            // - declare reports success of SQL statement planning
1347            // - execute reports success of dataflow execution
1348            match cmd {
1349                Command::Execute { .. } => typ = Some("execute"),
1350                Command::GetWebhook { .. } => typ = Some("webhook"),
1351                Command::StartCopyFromStdin { .. }
1352                | Command::Startup { .. }
1353                | Command::AuthenticatePassword { .. }
1354                | Command::AuthenticateGetSASLChallenge { .. }
1355                | Command::AuthenticateVerifySASLProof { .. }
1356                | Command::CheckRoleCanLogin { .. }
1357                | Command::CatalogSnapshot { .. }
1358                | Command::Commit { .. }
1359                | Command::CancelRequest { .. }
1360                | Command::PrivilegedCancelRequest { .. }
1361                | Command::GetSystemVars { .. }
1362                | Command::SetSystemVars { .. }
1363                | Command::UpdateScopedSystemParameters { .. }
1364                | Command::InstallScopedSystemParameterFrontend { .. }
1365                | Command::Terminate { .. }
1366                | Command::RetireExecute { .. }
1367                | Command::CheckConsistency { .. }
1368                | Command::Dump { .. }
1369                | Command::GetComputeInstanceClient { .. }
1370                | Command::GetOracle { .. }
1371                | Command::DetermineRealTimeRecentTimestamp { .. }
1372                | Command::GetTransactionReadHoldsBundle { .. }
1373                | Command::StoreTransactionReadHolds { .. }
1374                | Command::ExecuteSlowPathPeek { .. }
1375                | Command::ExecuteSubscribe { .. }
1376                | Command::CopyToPreflight { .. }
1377                | Command::ExecuteCopyTo { .. }
1378                | Command::ExecuteSideEffectingFunc { .. }
1379                | Command::RegisterFrontendPeek { .. }
1380                | Command::UnregisterFrontendPeek { .. }
1381                | Command::ExplainTimestamp { .. }
1382                | Command::FrontendStatementLogging(..)
1383                | Command::InjectAuditEvents { .. } => {}
1384            };
1385            cmd
1386        });
1387
1388        let mut cancel_future = pin::pin!(cancel_future);
1389        let mut cancelled = false;
1390        loop {
1391            tokio::select! {
1392                res = &mut guarded_rx => {
1393                    // We received a result, so drop our guard to drop our borrows.
1394                    drop(guarded_rx);
1395
1396                    let res = res.expect("sender dropped");
1397                    let status = res.result.is_ok().then_some("success").unwrap_or("error");
1398                    if let Err(err) = res.result.as_ref() {
1399                        if name_hint.should_trace_errors() {
1400                            tracing::warn!(?err, ?name_hint, "adapter response error");
1401                        }
1402                    }
1403
1404                    if let Some(typ) = typ {
1405                        inner_client
1406                            .metrics
1407                            .commands
1408                            .with_label_values(&[typ, status, name_hint.as_str()])
1409                            .inc();
1410                    }
1411                    *client_session = Some(res.session);
1412                    return res.result;
1413                },
1414                _err = &mut cancel_future, if !cancelled => {
1415                    cancelled = true;
1416                    inner_client.send(Command::PrivilegedCancelRequest {
1417                        conn_id: conn_id.clone(),
1418                    });
1419                }
1420            };
1421        }
1422    }
1423
1424    pub fn add_idle_in_transaction_session_timeout(&mut self) {
1425        let session = self.session();
1426        let timeout_dur = session.vars().idle_in_transaction_session_timeout();
1427        if !timeout_dur.is_zero() {
1428            let timeout_dur = timeout_dur.clone();
1429            if let Some(txn) = session.transaction().inner() {
1430                let txn_id = txn.id.clone();
1431                let timeout = TimeoutType::IdleInTransactionSession(txn_id);
1432                self.timeouts.add_timeout(timeout, timeout_dur);
1433            }
1434        }
1435    }
1436
1437    pub fn remove_idle_in_transaction_session_timeout(&mut self) {
1438        let session = self.session();
1439        if let Some(txn) = session.transaction().inner() {
1440            let txn_id = txn.id.clone();
1441            self.timeouts
1442                .remove_timeout(&TimeoutType::IdleInTransactionSession(txn_id));
1443        }
1444    }
1445
1446    /// # Cancel safety
1447    ///
1448    /// This method is cancel safe. If `recv` is used as the event in a
1449    /// `tokio::select!` statement and some other branch
1450    /// completes first, it is guaranteed that no messages were received on this
1451    /// channel.
1452    pub async fn recv_timeout(&mut self) -> Option<TimeoutType> {
1453        self.timeouts.recv().await
1454    }
1455
1456    /// Attempt to sequence a peek from the session task.
1457    ///
1458    /// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the
1459    /// Coordinator's sequencing. If it returns an error, it should be returned to the user.
1460    ///
1461    /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
1462    /// triggering the execution of the underlying query.
1463    pub(crate) async fn try_frontend_peek(
1464        &mut self,
1465        portal_name: &str,
1466        catalog: Option<Arc<Catalog>>,
1467        outer_ctx_extra: &mut Option<ExecuteContextGuard>,
1468    ) -> Result<Option<ExecuteResponse>, AdapterError> {
1469        if self.enable_frontend_peek_sequencing {
1470            let session = self.session.as_mut().expect("SessionClient invariant");
1471            self.peek_client
1472                .try_frontend_peek(portal_name, catalog, session, outer_ctx_extra)
1473                .await
1474        } else {
1475            Ok(None)
1476        }
1477    }
1478}
1479
1480impl Drop for SessionClient {
1481    fn drop(&mut self) {
1482        // We may not have a session if this client was dropped while awaiting
1483        // a response. In this case, it is the coordinator's responsibility to
1484        // terminate the session.
1485        if let Some(session) = self.session.take() {
1486            // We may not have a connection to the Coordinator if the session was
1487            // prematurely terminated, for example due to a timeout.
1488            if let Some(inner) = &self.inner {
1489                inner.send(Command::Terminate {
1490                    conn_id: session.conn_id().clone(),
1491                    tx: None,
1492                })
1493            }
1494        }
1495    }
1496}
1497
1498#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug)]
1499pub enum TimeoutType {
1500    IdleInTransactionSession(TransactionId),
1501}
1502
1503impl Display for TimeoutType {
1504    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1505        match self {
1506            TimeoutType::IdleInTransactionSession(txn_id) => {
1507                writeln!(f, "Idle in transaction session for transaction '{txn_id}'")
1508            }
1509        }
1510    }
1511}
1512
1513impl From<TimeoutType> for AdapterError {
1514    fn from(timeout: TimeoutType) -> Self {
1515        match timeout {
1516            TimeoutType::IdleInTransactionSession(_) => {
1517                AdapterError::IdleInTransactionSessionTimeout
1518            }
1519        }
1520    }
1521}
1522
1523struct Timeout {
1524    tx: mpsc::UnboundedSender<TimeoutType>,
1525    rx: mpsc::UnboundedReceiver<TimeoutType>,
1526    active_timeouts: BTreeMap<TimeoutType, AbortOnDropHandle<()>>,
1527}
1528
1529impl Timeout {
1530    fn new() -> Self {
1531        let (tx, rx) = mpsc::unbounded_channel();
1532        Timeout {
1533            tx,
1534            rx,
1535            active_timeouts: BTreeMap::new(),
1536        }
1537    }
1538
1539    /// # Cancel safety
1540    ///
1541    /// This method is cancel safe. If `recv` is used as the event in a
1542    /// `tokio::select!` statement and some other branch
1543    /// completes first, it is guaranteed that no messages were received on this
1544    /// channel.
1545    ///
1546    /// <https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety>
1547    async fn recv(&mut self) -> Option<TimeoutType> {
1548        self.rx.recv().await
1549    }
1550
1551    fn add_timeout(&mut self, timeout: TimeoutType, duration: Duration) {
1552        let tx = self.tx.clone();
1553        let timeout_key = timeout.clone();
1554        let handle = mz_ore::task::spawn(|| format!("{timeout_key}"), async move {
1555            tokio::time::sleep(duration).await;
1556            let _ = tx.send(timeout);
1557        })
1558        .abort_on_drop();
1559        self.active_timeouts.insert(timeout_key, handle);
1560    }
1561
1562    fn remove_timeout(&mut self, timeout: &TimeoutType) {
1563        self.active_timeouts.remove(timeout);
1564
1565        // Remove the timeout from the rx queue if it exists.
1566        let mut timeouts = Vec::new();
1567        while let Ok(pending_timeout) = self.rx.try_recv() {
1568            if timeout != &pending_timeout {
1569                timeouts.push(pending_timeout);
1570            }
1571        }
1572        for pending_timeout in timeouts {
1573            self.tx.send(pending_timeout).expect("rx is in this struct");
1574        }
1575    }
1576}
1577
1578/// A wrapper around a Stream of PeekResponseUnary that records when it sees the
1579/// first row data in the given histogram. It also keeps track of whether we have already observed
1580/// the end of the underlying stream.
1581#[derive(Derivative)]
1582#[derivative(Debug)]
1583pub struct RecordFirstRowStream {
1584    /// The underlying stream of rows.
1585    #[derivative(Debug = "ignore")]
1586    pub rows: Box<dyn Stream<Item = PeekResponseUnary> + Unpin + Send + Sync>,
1587    /// The Instant when execution started.
1588    pub execute_started: Instant,
1589    /// The histogram where the time since `execute_started` will be recorded when we see the first
1590    /// row.
1591    pub time_to_first_row_seconds: Histogram,
1592    /// Whether we've seen any rows.
1593    pub saw_rows: bool,
1594    /// The Instant when we saw the first row.
1595    pub recorded_first_row_instant: Option<Instant>,
1596    /// Whether we have already observed the end of the underlying stream.
1597    pub no_more_rows: bool,
1598    /// Whether the first-to-last-byte metric has already been recorded for this stream.
1599    pub metric_recorded: bool,
1600}
1601
1602impl RecordFirstRowStream {
1603    /// Create a new [`RecordFirstRowStream`]
1604    pub fn new(
1605        rows: Box<dyn Stream<Item = PeekResponseUnary> + Unpin + Send + Sync>,
1606        execute_started: Instant,
1607        client: &SessionClient,
1608        instance_id: Option<ComputeInstanceId>,
1609        strategy: Option<StatementExecutionStrategy>,
1610    ) -> Self {
1611        let histogram = Self::histogram(client, instance_id, strategy);
1612        Self {
1613            rows,
1614            execute_started,
1615            time_to_first_row_seconds: histogram,
1616            saw_rows: false,
1617            recorded_first_row_instant: None,
1618            no_more_rows: false,
1619            metric_recorded: false,
1620        }
1621    }
1622
1623    fn histogram(
1624        client: &SessionClient,
1625        instance_id: Option<ComputeInstanceId>,
1626        strategy: Option<StatementExecutionStrategy>,
1627    ) -> Histogram {
1628        let session = client.session.as_ref().expect("session invariant");
1629        let isolation_level = *session.vars().transaction_isolation();
1630        let name_hint = ApplicationNameHint::from_str(session.application_name());
1631        let instance = match instance_id {
1632            Some(i) => Cow::Owned(i.to_string()),
1633            None => Cow::Borrowed("none"),
1634        };
1635        let strategy = match strategy {
1636            Some(s) => s.name(),
1637            None => "none",
1638        };
1639
1640        client
1641            .inner()
1642            .metrics()
1643            .time_to_first_row_seconds
1644            .with_label_values(&[
1645                instance.as_ref(),
1646                isolation_level.as_variant_str(),
1647                strategy,
1648                name_hint.as_str(),
1649            ])
1650    }
1651
1652    /// If you want to match [`RecordFirstRowStream`]'s logic but don't need
1653    /// a UnboundedReceiver, you can tell it when to record an observation.
1654    pub fn record(
1655        execute_started: Instant,
1656        client: &SessionClient,
1657        instance_id: Option<ComputeInstanceId>,
1658        strategy: Option<StatementExecutionStrategy>,
1659    ) {
1660        Self::histogram(client, instance_id, strategy)
1661            .observe(execute_started.elapsed().as_secs_f64());
1662    }
1663
1664    pub async fn recv(&mut self) -> Option<PeekResponseUnary> {
1665        let msg = self.rows.next().await;
1666        if !self.saw_rows && matches!(msg, Some(PeekResponseUnary::Rows(_))) {
1667            self.saw_rows = true;
1668            self.time_to_first_row_seconds
1669                .observe(self.execute_started.elapsed().as_secs_f64());
1670            self.recorded_first_row_instant = Some(Instant::now());
1671        }
1672        if msg.is_none() {
1673            self.no_more_rows = true;
1674        }
1675        msg
1676    }
1677}