Skip to main content

mz_adapter/
client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::BTreeMap;
12use std::fmt::{Debug, Display, Formatter};
13use std::future::Future;
14use std::pin::{self, Pin};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use anyhow::bail;
19use chrono::{DateTime, Utc};
20use derivative::Derivative;
21use futures::{Stream, StreamExt};
22use itertools::Itertools;
23use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
24use mz_auth::password::Password;
25use mz_auth::{Authenticated, AuthenticatorKind};
26use mz_build_info::BuildInfo;
27use mz_compute_types::ComputeInstanceId;
28use mz_ore::channel::OneshotReceiverExt;
29use mz_ore::collections::CollectionExt;
30use mz_ore::id_gen::{IdAllocator, IdAllocatorInnerBitSet, MAX_ORG_ID, org_id_conn_bits};
31use mz_ore::instrument;
32use mz_ore::now::{EpochMillis, NowFn, to_datetime};
33use mz_ore::str::StrExt;
34use mz_ore::task::AbortOnDropHandle;
35use mz_ore::thread::JoinOnDropHandle;
36use mz_ore::tracing::OpenTelemetryContext;
37use mz_repr::user::InternalUserMetadata;
38use mz_repr::{CatalogItemId, ColumnIndex, SqlScalarType};
39use mz_sql::ast::{Raw, Statement};
40use mz_sql::catalog::{EnvironmentId, SessionCatalog};
41use mz_sql::session::hint::ApplicationNameHint;
42use mz_sql::session::metadata::SessionMetadata;
43use mz_sql::session::user::SUPPORT_USER;
44use mz_sql::session::vars::{
45    CLUSTER, ENABLE_FRONTEND_PEEK_SEQUENCING, OwnedVarInput, SystemVars, Var,
46};
47use mz_sql_parser::parser::{ParserStatementError, StatementParseResult};
48use prometheus::Histogram;
49use serde_json::json;
50use tokio::sync::{mpsc, oneshot};
51use tracing::{debug, error};
52use uuid::Uuid;
53
54use crate::catalog::Catalog;
55use crate::command::{
56    CatalogDump, CatalogSnapshot, Command, CopyFromStdinWriter, ExecuteResponse, Response,
57    SASLChallengeResponse, SASLVerifyProofResponse, SuperuserAttribute,
58};
59use crate::coord::{Coordinator, ExecuteContextGuard};
60use crate::error::AdapterError;
61use crate::metrics::{self, Metrics};
62use crate::session::{
63    EndTransactionAction, PreparedStatement, Session, SessionConfig, StateRevision, TransactionId,
64};
65use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
66use crate::telemetry::{self, EventDetails, SegmentClientExt, StatementFailureType};
67use crate::webhook::AppendWebhookResponse;
68use crate::{AdapterNotice, AppendWebhookError, PeekClient, PeekResponseUnary, StartupResponse};
69
70/// A handle to a running coordinator.
71///
72/// The coordinator runs on its own thread. Dropping the handle will wait for
73/// the coordinator's thread to exit, which will only occur after all
74/// outstanding [`Client`]s for the coordinator have dropped.
75pub struct Handle {
76    pub(crate) session_id: Uuid,
77    pub(crate) start_instant: Instant,
78    pub(crate) _thread: JoinOnDropHandle<()>,
79}
80
81impl Handle {
82    /// Returns the session ID associated with this coordinator.
83    ///
84    /// The session ID is generated on coordinator boot. It lasts for the
85    /// lifetime of the coordinator. Restarting the coordinator will result
86    /// in a new session ID.
87    pub fn session_id(&self) -> Uuid {
88        self.session_id
89    }
90
91    /// Returns the instant at which the coordinator booted.
92    pub fn start_instant(&self) -> Instant {
93        self.start_instant
94    }
95}
96
97/// A coordinator client.
98///
99/// A coordinator client is a simple handle to a communication channel with the
100/// coordinator. It can be cheaply cloned.
101///
102/// Clients keep the coordinator alive. The coordinator will not exit until all
103/// outstanding clients have dropped.
104#[derive(Debug, Clone)]
105pub struct Client {
106    build_info: &'static BuildInfo,
107    inner_cmd_tx: mpsc::UnboundedSender<(OpenTelemetryContext, Command)>,
108    id_alloc: IdAllocator<IdAllocatorInnerBitSet>,
109    now: NowFn,
110    metrics: Metrics,
111    environment_id: EnvironmentId,
112    segment_client: Option<mz_segment::Client>,
113}
114
115impl Client {
116    pub(crate) fn new(
117        build_info: &'static BuildInfo,
118        cmd_tx: mpsc::UnboundedSender<(OpenTelemetryContext, Command)>,
119        metrics: Metrics,
120        now: NowFn,
121        environment_id: EnvironmentId,
122        segment_client: Option<mz_segment::Client>,
123    ) -> Client {
124        // Connection ids are 32 bits and have 3 parts.
125        // 1. MSB bit is always 0 because these are interpreted as an i32, and it is possible some
126        //    driver will not handle a negative id since postgres has never produced one because it
127        //    uses process ids.
128        // 2. Next 12 bits are the lower 12 bits of the org id. This allows balancerd to route
129        //    incoming cancel messages to a subset of the environments.
130        // 3. Last 19 bits are random.
131        let env_lower = org_id_conn_bits(&environment_id.organization_id());
132        Client {
133            build_info,
134            inner_cmd_tx: cmd_tx,
135            id_alloc: IdAllocator::new(1, MAX_ORG_ID, env_lower),
136            now,
137            metrics,
138            environment_id,
139            segment_client,
140        }
141    }
142
143    /// Allocates a client for an incoming connection.
144    pub fn new_conn_id(&self) -> Result<ConnectionId, AdapterError> {
145        self.id_alloc.alloc().ok_or(AdapterError::IdExhaustionError)
146    }
147
148    /// Creates a new session associated with this client for the given user.
149    ///
150    /// It is the caller's responsibility to have authenticated the user.
151    /// We pass in an Authenticated marker as a guardrail to ensure the
152    /// user has authenticated with an authenticator before creating a session.
153    pub fn new_session(&self, config: SessionConfig, _authenticated: Authenticated) -> Session {
154        // We use the system clock to determine when a session connected to Materialize. This is not
155        // intended to be 100% accurate and correct, so we don't burden the timestamp oracle with
156        // generating a more correct timestamp.
157        Session::new(self.build_info, config, self.metrics().session_metrics())
158    }
159
160    /// Used by [mz_auth::AuthenticatorKind::Password]
161    /// to verify the provided user's password against the
162    /// stored credentials in the catalog.
163    pub async fn authenticate(
164        &self,
165        user: &String,
166        password: &Password,
167    ) -> Result<Authenticated, AdapterError> {
168        let (tx, rx) = oneshot::channel();
169        self.send(Command::AuthenticatePassword {
170            role_name: user.to_string(),
171            password: Some(password.clone()),
172            tx,
173        });
174        rx.await.expect("sender dropped")?;
175        Ok(Authenticated)
176    }
177
178    /// Used by [mz_auth::AuthenticatorKind::Sasl] for SASL-SCRAM authentication.
179    /// This is used prior to [Client::verify_sasl_proof].
180    pub async fn generate_sasl_challenge(
181        &self,
182        user: &String,
183        client_nonce: &String,
184    ) -> Result<SASLChallengeResponse, AdapterError> {
185        let (tx, rx) = oneshot::channel();
186        self.send(Command::AuthenticateGetSASLChallenge {
187            role_name: user.to_string(),
188            nonce: client_nonce.to_string(),
189            tx,
190        });
191        let response = rx.await.expect("sender dropped")?;
192        Ok(response)
193    }
194
195    /// Used by [mz_auth::AuthenticatorKind::Sasl] for SASL-SCRAM authentication.
196    /// This is used after [Client::generate_sasl_challenge].
197    pub async fn verify_sasl_proof(
198        &self,
199        user: &String,
200        proof: &String,
201        nonce: &String,
202        mock_hash: &String,
203    ) -> Result<(SASLVerifyProofResponse, Authenticated), AdapterError> {
204        let (tx, rx) = oneshot::channel();
205        self.send(Command::AuthenticateVerifySASLProof {
206            role_name: user.to_string(),
207            proof: proof.to_string(),
208            auth_message: nonce.to_string(),
209            mock_hash: mock_hash.to_string(),
210            tx,
211        });
212        let response = rx.await.expect("sender dropped")?;
213        Ok((response, Authenticated))
214    }
215
216    /// Checks if a role exists and has the `LOGIN` attribute.
217    pub async fn role_can_login(&self, role_name: &str) -> Result<(), AdapterError> {
218        let (tx, rx) = oneshot::channel();
219        self.send(Command::CheckRoleCanLogin {
220            role_name: role_name.to_string(),
221            tx,
222        });
223        rx.await.expect("sender dropped")
224    }
225
226    /// Upgrades this client to a session client.
227    ///
228    /// A session is a connection that has successfully negotiated parameters,
229    /// like the user. Most coordinator operations are available only after
230    /// upgrading a connection to a session.
231    ///
232    /// Returns a new client that is bound to the session and a response
233    /// containing various details about the startup.
234    #[mz_ore::instrument(level = "debug")]
235    pub async fn startup(&self, session: Session) -> Result<SessionClient, AdapterError> {
236        let user = session.user().clone();
237        let conn_id = session.conn_id().clone();
238        let secret_key = session.secret_key();
239        let uuid = session.uuid();
240        let client_ip = session.client_ip();
241        let application_name = session.application_name().into();
242        let notice_tx = session.retain_notice_transmitter();
243
244        let (tx, rx) = oneshot::channel();
245
246        // ~~SPOOKY ZONE~~
247        //
248        // This guard prevents a race where the startup command finishes, but the Future returned
249        // by this function is concurrently dropped, so we never create a `SessionClient` and thus
250        // never cleanup the initialized Session.
251        let rx = rx.with_guard(|_| {
252            self.send(Command::Terminate {
253                conn_id: conn_id.clone(),
254                tx: None,
255            });
256        });
257
258        self.send(Command::Startup {
259            tx,
260            user,
261            conn_id: conn_id.clone(),
262            secret_key,
263            uuid,
264            client_ip: client_ip.copied(),
265            application_name,
266            notice_tx,
267        });
268
269        // When startup fails, no need to call terminate (handle_startup does this). Delay creating
270        // the client until after startup to sidestep the panic in its `Drop` implementation.
271        let response = rx.await.expect("sender dropped")?;
272
273        // Create the client as soon as startup succeeds (before any await points) so its `Drop` can
274        // handle termination.
275        // Build the PeekClient with controller handles returned from startup.
276        let StartupResponse {
277            role_id,
278            write_notify,
279            session_defaults,
280            catalog,
281            storage_collections,
282            transient_id_gen,
283            optimizer_metrics,
284            persist_client,
285            statement_logging_frontend,
286            superuser_attribute,
287        } = response;
288
289        let peek_client = PeekClient::new(
290            self.clone(),
291            storage_collections,
292            transient_id_gen,
293            optimizer_metrics,
294            persist_client,
295            statement_logging_frontend,
296        );
297
298        let mut client = SessionClient {
299            inner: Some(self.clone()),
300            session: Some(session),
301            timeouts: Timeout::new(),
302            environment_id: self.environment_id.clone(),
303            segment_client: self.segment_client.clone(),
304            peek_client,
305            enable_frontend_peek_sequencing: false, // initialized below, once we have a ConnCatalog
306        };
307
308        let session = client.session();
309
310        // Apply the superuser attribute to the session's user if
311        // it exists.
312        if let SuperuserAttribute(Some(superuser)) = superuser_attribute {
313            session.apply_internal_user_metadata(InternalUserMetadata { superuser });
314        }
315
316        session.initialize_role_metadata(role_id);
317        let vars_mut = session.vars_mut();
318        for (name, val) in session_defaults {
319            if let Err(err) = vars_mut.set_default(&name, val.borrow()) {
320                // Note: erroring here is unexpected, but we don't want to panic if somehow our
321                // assumptions are wrong.
322                tracing::error!("failed to set peristed default, {err:?}");
323            }
324        }
325        session
326            .vars_mut()
327            .end_transaction(EndTransactionAction::Commit);
328
329        // Stash the future that notifies us of builtin table writes completing, we'll block on
330        // this future before allowing queries from this session against relevant relations.
331        //
332        // Note: We stash the future as opposed to waiting on it here to prevent blocking session
333        // creation on builtin table updates. This improves the latency for session creation and
334        // reduces scheduling load on any dataflows that read from these builtin relations, since
335        // it allows updates to be batched.
336        session.set_builtin_table_updates(write_notify);
337
338        let catalog = catalog.for_session(session);
339
340        let cluster_active = session.vars().cluster().to_string();
341        if session.vars().welcome_message() {
342            let cluster_info = if catalog.resolve_cluster(Some(&cluster_active)).is_err() {
343                format!("{cluster_active} (does not exist)")
344            } else {
345                cluster_active.to_string()
346            };
347
348            // Emit a welcome message, optimized for readability by humans using
349            // interactive tools. If you change the message, make sure that it
350            // formats nicely in both `psql` and the console's SQL shell.
351            session.add_notice(AdapterNotice::Welcome(format!(
352                "connected to Materialize v{}
353  Environment ID: {}
354  Region: {}
355  User: {}
356  Cluster: {}
357  Database: {}
358  {}
359  Session UUID: {}
360
361Issue a SQL query to get started. Need help?
362  View documentation: https://materialize.com/s/docs
363  Join our Slack community: https://materialize.com/s/chat
364    ",
365                session.vars().build_info().semver_version(),
366                self.environment_id,
367                self.environment_id.region(),
368                session.vars().user().name,
369                cluster_info,
370                session.vars().database(),
371                match session.vars().search_path() {
372                    [schema] => format!("Schema: {}", schema),
373                    schemas => format!(
374                        "Search path: {}",
375                        schemas.iter().map(|id| id.to_string()).join(", ")
376                    ),
377                },
378                session.uuid(),
379            )));
380        }
381
382        if session.vars().current_object_missing_warnings() {
383            if catalog.active_database().is_none() {
384                let db = session.vars().database().into();
385                session.add_notice(AdapterNotice::UnknownSessionDatabase(db));
386            }
387        }
388
389        // Users stub their toe on their default cluster not existing, so we provide a notice to
390        // help guide them on what do to.
391        let cluster_var = session
392            .vars()
393            .inspect(CLUSTER.name())
394            .expect("cluster should exist");
395        if session.vars().current_object_missing_warnings()
396            && catalog.resolve_cluster(Some(&cluster_active)).is_err()
397        {
398            let cluster_notice = 'notice: {
399                if cluster_var.inspect_session_value().is_some() {
400                    break 'notice Some(AdapterNotice::DefaultClusterDoesNotExist {
401                        name: cluster_active,
402                        kind: "session",
403                        suggested_action: "Pick an extant cluster with SET CLUSTER = name. Run SHOW CLUSTERS to see available clusters.".into(),
404                    });
405                }
406
407                let role_default = catalog.get_role(catalog.active_role_id());
408                let role_cluster = match role_default.vars().get(CLUSTER.name()) {
409                    Some(OwnedVarInput::Flat(name)) => Some(name),
410                    None => None,
411                    // This is unexpected!
412                    Some(v @ OwnedVarInput::SqlSet(_)) => {
413                        tracing::warn!(?v, "SqlSet found for cluster Role Default");
414                        break 'notice None;
415                    }
416                };
417
418                let alter_role = "with `ALTER ROLE <role> SET cluster TO <cluster>;`";
419                match role_cluster {
420                    // If there is no default, suggest a Role default.
421                    None => Some(AdapterNotice::DefaultClusterDoesNotExist {
422                        name: cluster_active,
423                        kind: "system",
424                        suggested_action: format!(
425                            "Set a default cluster for the current role {alter_role}."
426                        ),
427                    }),
428                    // If the default does not exist, suggest to change it.
429                    Some(_) => Some(AdapterNotice::DefaultClusterDoesNotExist {
430                        name: cluster_active,
431                        kind: "role",
432                        suggested_action: format!(
433                            "Change the default cluster for the current role {alter_role}."
434                        ),
435                    }),
436                }
437            };
438
439            if let Some(notice) = cluster_notice {
440                session.add_notice(notice);
441            }
442        }
443
444        client.enable_frontend_peek_sequencing = ENABLE_FRONTEND_PEEK_SEQUENCING
445            .require(catalog.system_vars())
446            .is_ok();
447
448        Ok(client)
449    }
450
451    /// Cancels the query currently running on the specified connection.
452    pub fn cancel_request(&self, conn_id: ConnectionIdType, secret_key: u32) {
453        self.send(Command::CancelRequest {
454            conn_id,
455            secret_key,
456        });
457    }
458
459    /// Executes a single SQL statement that returns rows as the
460    /// `mz_support` user.
461    pub async fn support_execute_one(
462        &self,
463        sql: &str,
464    ) -> Result<Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send>>, anyhow::Error> {
465        // Connect to the coordinator.
466        let conn_id = self.new_conn_id()?;
467        let session = self.new_session(
468            SessionConfig {
469                conn_id,
470                uuid: Uuid::new_v4(),
471                user: SUPPORT_USER.name.clone(),
472                client_ip: None,
473                external_metadata_rx: None,
474                helm_chart_version: None,
475                authenticator_kind: AuthenticatorKind::None,
476                groups: None,
477            },
478            Authenticated,
479        );
480        let mut session_client = self.startup(session).await?;
481
482        // Parse the SQL statement.
483        let stmts = mz_sql::parse::parse(sql)?;
484        if stmts.len() != 1 {
485            bail!("must supply exactly one query");
486        }
487        let StatementParseResult { ast: stmt, sql } = stmts.into_element();
488
489        const EMPTY_PORTAL: &str = "";
490        session_client.start_transaction(Some(1))?;
491        session_client
492            .declare(EMPTY_PORTAL.into(), stmt, sql.to_string())
493            .await?;
494
495        let execute_result = session_client
496            .execute(EMPTY_PORTAL.into(), futures::future::pending(), None)
497            .await?;
498        match execute_result {
499            (ExecuteResponse::SendingRowsStreaming { mut rows, .. }, _) => {
500                // We have to only drop the session client _after_ we read the
501                // result. Otherwise the peek will get cancelled right when we
502                // drop the session client. So we wrap it up in an extra stream
503                // like this, which owns the client and can return it.
504                let owning_response_stream = async_stream::stream! {
505                    while let Some(rows) = rows.next().await {
506                        yield rows;
507                    }
508                    drop(session_client);
509                };
510                Ok(Box::pin(owning_response_stream))
511            }
512            r => bail!("unsupported response type: {r:?}"),
513        }
514    }
515
516    /// Returns the metrics associated with the adapter layer.
517    pub fn metrics(&self) -> &Metrics {
518        &self.metrics
519    }
520
521    /// The current time according to the [`Client`].
522    pub fn now(&self) -> DateTime<Utc> {
523        to_datetime((self.now)())
524    }
525
526    /// Get a metadata and a channel that can be used to append to a webhook source.
527    pub async fn get_webhook_appender(
528        &self,
529        database: String,
530        schema: String,
531        name: String,
532    ) -> Result<AppendWebhookResponse, AppendWebhookError> {
533        let (tx, rx) = oneshot::channel();
534
535        // Send our request.
536        self.send(Command::GetWebhook {
537            database,
538            schema,
539            name,
540            tx,
541        });
542
543        // Using our one shot channel to get the result, returning an error if the sender dropped.
544        let response = rx
545            .await
546            .map_err(|_| anyhow::anyhow!("failed to receive webhook response"))?;
547
548        response
549    }
550
551    /// Gets the current value of all system variables.
552    pub async fn get_system_vars(&self) -> SystemVars {
553        let (tx, rx) = oneshot::channel();
554        self.send(Command::GetSystemVars { tx });
555        rx.await.expect("coordinator unexpectedly gone")
556    }
557
558    #[instrument(level = "debug")]
559    pub(crate) fn send(&self, cmd: Command) {
560        self.inner_cmd_tx
561            .send((OpenTelemetryContext::obtain(), cmd))
562            .expect("coordinator unexpectedly gone");
563    }
564}
565
566/// A coordinator client that is bound to a connection.
567///
568/// See also [`Client`].
569pub struct SessionClient {
570    // Invariant: inner may only be `None` after the session has been terminated.
571    // Once the session is terminated, no communication to the Coordinator
572    // should be attempted.
573    inner: Option<Client>,
574    // Invariant: session may only be `None` during a method call. Every public
575    // method must ensure that `Session` is `Some` before it returns.
576    session: Option<Session>,
577    timeouts: Timeout,
578    segment_client: Option<mz_segment::Client>,
579    environment_id: EnvironmentId,
580    /// Client for frontend peek sequencing; populated at connection startup.
581    peek_client: PeekClient,
582    /// Whether frontend peek sequencing is enabled; initialized at connection startup.
583    // TODO(peek-seq): Currently, this is initialized only at session startup. We'll be able to
584    // check the actual feature flag value at every peek (without a Coordinator call) once we'll
585    // always have a catalog snapshot at hand.
586    pub enable_frontend_peek_sequencing: bool,
587}
588
589impl SessionClient {
590    /// Parses a SQL expression, reporting failures as a telemetry event if
591    /// possible.
592    pub fn parse<'a>(
593        &self,
594        sql: &'a str,
595    ) -> Result<Result<Vec<StatementParseResult<'a>>, ParserStatementError>, String> {
596        match mz_sql::parse::parse_with_limit(sql) {
597            Ok(Err(e)) => {
598                self.track_statement_parse_failure(&e);
599                Ok(Err(e))
600            }
601            r => r,
602        }
603    }
604
605    fn track_statement_parse_failure(&self, parse_error: &ParserStatementError) {
606        let session = self.session.as_ref().expect("session invariant violated");
607        let Some(user_id) = session.user().external_metadata.as_ref().map(|m| m.user_id) else {
608            return;
609        };
610        let Some(segment_client) = &self.segment_client else {
611            return;
612        };
613        let Some(statement_kind) = parse_error.statement else {
614            return;
615        };
616        let Some((action, object_type)) = telemetry::analyze_audited_statement(statement_kind)
617        else {
618            return;
619        };
620        let event_type = StatementFailureType::ParseFailure;
621        let event_name = format!(
622            "{} {} {}",
623            object_type.as_title_case(),
624            action.as_title_case(),
625            event_type.as_title_case(),
626        );
627        segment_client.environment_track(
628            &self.environment_id,
629            event_name,
630            json!({
631                "statement_kind": statement_kind,
632                "error": &parse_error.error,
633            }),
634            EventDetails {
635                user_id: Some(user_id),
636                application_name: Some(session.application_name()),
637                ..Default::default()
638            },
639        );
640    }
641
642    // Verify and return the named prepared statement. We need to verify each use
643    // to make sure the prepared statement is still safe to use.
644    pub async fn get_prepared_statement(
645        &mut self,
646        name: &str,
647    ) -> Result<&PreparedStatement, AdapterError> {
648        let catalog = self.catalog_snapshot("get_prepared_statement").await;
649        Coordinator::verify_prepared_statement(&catalog, self.session(), name)?;
650        Ok(self
651            .session()
652            .get_prepared_statement_unverified(name)
653            .expect("must exist"))
654    }
655
656    /// Saves the parsed statement as a prepared statement.
657    ///
658    /// The prepared statement is saved in the connection's [`crate::session::Session`]
659    /// under the specified name.
660    pub async fn prepare(
661        &mut self,
662        name: String,
663        stmt: Option<Statement<Raw>>,
664        sql: String,
665        param_types: Vec<Option<SqlScalarType>>,
666    ) -> Result<(), AdapterError> {
667        let catalog = self.catalog_snapshot("prepare").await;
668
669        // Note: This failpoint is used to simulate a request outliving the external connection
670        // that made it.
671        let mut async_pause = false;
672        (|| {
673            fail::fail_point!("async_prepare", |val| {
674                async_pause = val.map_or(false, |val| val.parse().unwrap_or(false))
675            });
676        })();
677        if async_pause {
678            tokio::time::sleep(Duration::from_secs(1)).await;
679        };
680
681        let desc = Coordinator::describe(&catalog, self.session(), stmt.clone(), param_types)?;
682        let now = self.now();
683        let state_revision = StateRevision {
684            catalog_revision: catalog.transient_revision(),
685            session_state_revision: self.session().state_revision(),
686        };
687        self.session()
688            .set_prepared_statement(name, stmt, sql, desc, state_revision, now);
689        Ok(())
690    }
691
692    /// Binds a statement to a portal.
693    #[mz_ore::instrument(level = "debug")]
694    pub async fn declare(
695        &mut self,
696        name: String,
697        stmt: Statement<Raw>,
698        sql: String,
699    ) -> Result<(), AdapterError> {
700        let catalog = self.catalog_snapshot("declare").await;
701        let param_types = vec![];
702        let desc =
703            Coordinator::describe(&catalog, self.session(), Some(stmt.clone()), param_types)?;
704        let params = vec![];
705        let result_formats = vec![mz_pgwire_common::Format::Text; desc.arity()];
706        let now = self.now();
707        let logging = self.session().mint_logging(sql, Some(&stmt), now);
708        let state_revision = StateRevision {
709            catalog_revision: catalog.transient_revision(),
710            session_state_revision: self.session().state_revision(),
711        };
712        self.session().set_portal(
713            name,
714            desc,
715            Some(stmt),
716            logging,
717            params,
718            result_formats,
719            state_revision,
720        )?;
721        Ok(())
722    }
723
724    /// Executes a previously-bound portal.
725    ///
726    /// Note: the provided `cancel_future` must be cancel-safe as it's polled in a `select!` loop.
727    ///
728    /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
729    /// triggering the execution of the underlying query.
730    #[mz_ore::instrument(level = "debug")]
731    pub async fn execute(
732        &mut self,
733        portal_name: String,
734        cancel_future: impl Future<Output = std::io::Error> + Send,
735        outer_ctx_extra: Option<ExecuteContextGuard>,
736    ) -> Result<(ExecuteResponse, Instant), AdapterError> {
737        let execute_started = Instant::now();
738
739        let mut outer_ctx_extra = outer_ctx_extra;
740
741        // Unroll SQL `EXECUTE <prepared> (...)` so the inner statement
742        // flows through `try_frontend_peek` below, rather than being
743        // re-dispatched via `Command::Execute` from the coordinator's
744        // `Plan::Execute` handler. Without this, a prepared statement
745        // would route differently from the same statement issued
746        // directly.
747        //
748        // On a successful unroll, `unroll_sql_execute` also returns a
749        // catalog snapshot (threaded through to avoid taking a second
750        // one) and begins EXECUTE-level statement logging on the outer
751        // portal — so `mz_statement_execution_history` records
752        // `EXECUTE foo (...)`, not the inner SQL — installing the
753        // resulting `ExecuteContextGuard` into `outer_ctx_extra`.
754        let (portal_name, catalog) = self
755            .unroll_sql_execute(portal_name, &mut outer_ctx_extra)
756            .await?;
757
758        // Attempt peek sequencing in the session task.
759        // If unsupported, fall back to the Coordinator path.
760        // TODO(peek-seq): wire up cancel_future
761        let peek_result = self
762            .try_frontend_peek(&portal_name, catalog, &mut outer_ctx_extra)
763            .await?;
764        if let Some(resp) = peek_result {
765            debug!("frontend peek succeeded");
766            // Frontend peek handled the execution and retired outer_ctx_extra if it existed.
767            // No additional work needed here.
768            return Ok((resp, execute_started));
769        } else {
770            debug!("frontend peek did not happen, falling back to `Command::Execute`");
771            // If we bailed out, outer_ctx_extra is still present (if it was originally).
772            // `Command::Execute` will handle it.
773            // (This is not true if we bailed out _after_ the frontend peek sequencing has already
774            // begun its own statement logging. That case would be a bug.)
775        }
776
777        let response = self
778            .send_with_cancel(
779                |tx, session| Command::Execute {
780                    portal_name,
781                    session,
782                    tx,
783                    outer_ctx_extra,
784                },
785                cancel_future,
786            )
787            .await?;
788        Ok((response, execute_started))
789    }
790
791    /// If the named portal binds a SQL `EXECUTE <prepared>`, resolve the
792    /// prepared statement, install a fresh portal for the inner statement
793    /// (carrying the EXECUTE's actual parameter values), and return that
794    /// portal's name so the caller can run `try_frontend_peek` against it.
795    ///
796    /// Only ever unrolls one level: the parser rejects
797    /// `PREPARE foo AS EXECUTE bar` (matching Postgres), so the inner
798    /// statement is guaranteed not to be another `EXECUTE`. A failsafe below
799    /// surfaces an internal error if that invariant is ever violated.
800    ///
801    /// When the portal does not bind an `EXECUTE` — the common case —
802    /// returns the original portal name and `None`, costing only a portal
803    /// lookup. On unroll, also returns the catalog snapshot taken here so
804    /// the caller can thread it into `try_frontend_peek`, which reuses it
805    /// instead of taking its own.
806    async fn unroll_sql_execute(
807        &mut self,
808        portal_name: String,
809        outer_ctx_extra: &mut Option<ExecuteContextGuard>,
810    ) -> Result<(String, Option<Arc<Catalog>>), AdapterError> {
811        let (stmt, params, outer_logging, outer_lifecycle_timestamps) = {
812            let session = self.session.as_ref().expect("SessionClient invariant");
813            let portal = match session.get_portal_unverified(&portal_name) {
814                Some(p) => p,
815                // No portal: let `try_frontend_peek` surface the
816                // standard "missing portal" error.
817                None => return Ok((portal_name, None)),
818            };
819            match &portal.stmt {
820                Some(stmt) => (
821                    Arc::clone(stmt),
822                    portal.parameters.clone(),
823                    Arc::clone(&portal.logging),
824                    portal.lifecycle_timestamps.clone(),
825                ),
826                None => return Ok((portal_name, None)),
827            }
828        };
829
830        // Only EXECUTE statements need unrolling. Bail out before taking a
831        // catalog snapshot in the (overwhelmingly common) non-EXECUTE case.
832        if !matches!(&*stmt, Statement::Execute(_)) {
833            return Ok((portal_name, None));
834        }
835
836        let catalog = self.catalog_snapshot("unroll_sql_execute").await;
837
838        // Validate the outer EXECUTE portal against the (possibly newer)
839        // catalog: ensures the recorded portal description still matches
840        // what describing the EXECUTE would produce now.
841        {
842            let session = self.session.as_mut().expect("SessionClient invariant");
843            Coordinator::verify_portal(&catalog, session, &portal_name)?;
844        }
845
846        // Bump query_total for the outer EXECUTE itself. The inner
847        // statement gets its own increment inside `try_frontend_peek_inner`
848        // (or, on bailout, in the coordinator's `handle_execute`).
849        {
850            let session = self.session.as_ref().expect("SessionClient invariant");
851            session
852                .metrics()
853                .query_total(&[
854                    metrics::session_type_label_value(session.user()),
855                    metrics::statement_type_label_value(&stmt),
856                ])
857                .inc();
858        }
859
860        // Begin EXECUTE-level statement logging up front, so that planning
861        // errors below produce an `Errored` end-event in
862        // `mz_statement_execution_history` rather than no entry at all.
863        //
864        // We pass the *outer* portal's `logging` and pgwire-bound `params`
865        // so the recorded entry shows the user-visible `EXECUTE foo (...)`,
866        // not the inner SQL. The id (if any) moves into `outer_ctx_extra`
867        // below for `try_frontend_peek` to retire; on planning error we
868        // explicitly emit an `Errored` end-event below.
869        let began_outer_logging = outer_ctx_extra.is_none();
870        let logging_id: Option<crate::statement_logging::StatementLoggingId> =
871            if began_outer_logging {
872                let session = self.session.as_mut().expect("SessionClient invariant");
873                let result = self
874                    .peek_client
875                    .statement_logging_frontend
876                    .begin_statement_execution(
877                        session,
878                        &params,
879                        &outer_logging,
880                        catalog.system_config(),
881                        outer_lifecycle_timestamps,
882                    );
883                if let Some((id, began_execution, mseh_update, prepared_statement)) = result {
884                    self.peek_client.log_began_execution(
885                        began_execution,
886                        mseh_update,
887                        prepared_statement,
888                    );
889                    Some(id)
890                } else {
891                    None
892                }
893            } else {
894                None
895            };
896
897        let new_portal_name = match self.install_inner_portal_for_execute(&catalog, &stmt, &params)
898        {
899            Ok(name) => name,
900            Err(err) => {
901                if let Some(id) = logging_id {
902                    self.peek_client.log_ended_execution(
903                        id,
904                        StatementEndedExecutionReason::Errored {
905                            error: err.to_string(),
906                        },
907                    );
908                }
909                return Err(err);
910            }
911        };
912
913        // Hand off to `outer_ctx_extra` whenever we entered the begin path
914        // for the outer EXECUTE — even if `begin_statement_execution`
915        // returned `None` (sampling decided not to sample, or logging is
916        // disabled for the user). This mirrors the original coord path,
917        // which always installs a guard via
918        // `ExecuteContextGuard::new(maybe_uuid, ...)`. Without this, the
919        // inner portal would be treated as a fresh statement by
920        // `try_frontend_peek` (or the fallback `Command::Execute` path)
921        // and re-account its bytes against
922        // `mz_statement_logging_unsampled_bytes`, double-counting the
923        // inner SQL.
924        if began_outer_logging {
925            // Soft invariant: `try_frontend_peek` takes ownership of
926            // `outer_ctx_extra` immediately, so this guard's `Drop` is
927            // unreachable on the normal flow and the dummy channel is
928            // never used. If a panic does fire `Drop` between here and
929            // that takeover, the `Aborted` end-event is silently lost
930            // — an acceptable trade given the panic implies the
931            // connection is going down anyway.
932            let (dummy_tx, _dummy_rx) = mpsc::unbounded_channel();
933            *outer_ctx_extra = Some(ExecuteContextGuard::new(logging_id, dummy_tx));
934        }
935
936        Ok((new_portal_name, Some(catalog)))
937    }
938
939    /// Helper for [`Self::unroll_sql_execute`]: plans the outer
940    /// `Statement::Execute`, verifies the referenced prepared statement, and
941    /// installs a fresh portal carrying the inner statement plus the
942    /// EXECUTE's bound parameter values. Returns the new portal's name.
943    ///
944    /// Split out so [`Self::unroll_sql_execute`] can wrap the fallible work
945    /// in a single error-handling site that emits an `Errored` end-event
946    /// for the EXECUTE-level statement-logging entry.
947    fn install_inner_portal_for_execute(
948        &mut self,
949        catalog: &Arc<Catalog>,
950        stmt: &Arc<Statement<Raw>>,
951        params: &mz_sql::plan::Params,
952    ) -> Result<String, AdapterError> {
953        use mz_sql::plan::Plan;
954
955        let execute_plan = {
956            let session = self.session.as_mut().expect("SessionClient invariant");
957            let conn_catalog = catalog.for_session(session);
958            let (resolved_stmt, resolved_ids) =
959                mz_sql::names::resolve(&conn_catalog, (**stmt).clone())?;
960            let pcx = session.pcx();
961            let plan = mz_sql::plan::plan(
962                Some(pcx),
963                &conn_catalog,
964                resolved_stmt,
965                params,
966                &resolved_ids,
967            )?;
968            match plan {
969                Plan::Execute(plan) => plan,
970                other => {
971                    // Planning a `Statement::Execute` must yield
972                    // `Plan::Execute`. If it doesn't, the planner
973                    // contract is broken.
974                    return Err(AdapterError::Internal(format!(
975                        "planning Statement::Execute yielded unexpected plan: {:?}",
976                        mz_sql::plan::PlanKind::from(&other),
977                    )));
978                }
979            }
980        };
981
982        // Verify and install the inner portal. Mirrors
983        // `Coordinator::sequence_execute`. The new portal carries the inner
984        // prepared statement's `logging`, but `try_frontend_peek` will see
985        // `outer_ctx_extra=Some(...)` and inherit the EXECUTE-level logging
986        // instead of starting fresh from this portal.
987        let session = self.session.as_mut().expect("SessionClient invariant");
988        Coordinator::verify_prepared_statement(catalog, session, &execute_plan.name)?;
989        let ps = session
990            .get_prepared_statement_unverified(&execute_plan.name)
991            .expect("verified above");
992        let inner_stmt = ps.stmt().cloned();
993        let inner_desc = ps.desc().clone();
994        let state_revision = ps.state_revision;
995        let inner_logging = Arc::clone(ps.logging());
996
997        // Failsafe: `PREPARE foo AS EXECUTE bar` is rejected by the parser,
998        // so the resolved inner statement must not be another `EXECUTE`. If
999        // that ever changes, we'd silently skip frontend sequencing for the
1000        // deeper EXECUTEs — surface it as an internal error instead.
1001        if let Some(inner) = inner_stmt.as_ref() {
1002            if matches!(inner, Statement::Execute(_)) {
1003                return Err(AdapterError::Internal(format!(
1004                    "nested EXECUTE: prepared statement {} resolves to another EXECUTE; \
1005                     parser should reject `PREPARE ... AS EXECUTE ...`",
1006                    execute_plan.name.quoted(),
1007                )));
1008            }
1009        }
1010
1011        session.create_new_portal(
1012            inner_stmt,
1013            inner_logging,
1014            inner_desc,
1015            execute_plan.params,
1016            Vec::new(),
1017            state_revision,
1018        )
1019    }
1020
1021    fn now(&self) -> EpochMillis {
1022        (self.inner().now)()
1023    }
1024
1025    fn now_datetime(&self) -> DateTime<Utc> {
1026        to_datetime(self.now())
1027    }
1028
1029    /// Starts a transaction based on implicit:
1030    /// - `None`: InTransaction
1031    /// - `Some(1)`: Started
1032    /// - `Some(n > 1)`: InTransactionImplicit
1033    /// - `Some(0)`: no change
1034    pub fn start_transaction(&mut self, implicit: Option<usize>) -> Result<(), AdapterError> {
1035        let now = self.now_datetime();
1036        let session = self.session.as_mut().expect("session invariant violated");
1037        let result = match implicit {
1038            None => session.start_transaction(now, None, None),
1039            Some(stmts) => {
1040                session.start_transaction_implicit(now, stmts);
1041                Ok(())
1042            }
1043        };
1044        result
1045    }
1046
1047    /// Ends a transaction. Even if an error is returned, guarantees that the transaction in the
1048    /// session and Coordinator has cleared its state.
1049    #[instrument(level = "debug")]
1050    pub async fn end_transaction(
1051        &mut self,
1052        action: EndTransactionAction,
1053    ) -> Result<ExecuteResponse, AdapterError> {
1054        let res = self
1055            .send(|tx, session| Command::Commit {
1056                action,
1057                session,
1058                tx,
1059            })
1060            .await;
1061        // Commit isn't guaranteed to set the session's state to anything specific, so clear it
1062        // here. It's safe to ignore the returned `TransactionStatus` because that doesn't contain
1063        // any data that the Coordinator must act on for correctness.
1064        let _ = self.session().clear_transaction();
1065        res
1066    }
1067
1068    /// Fails a transaction.
1069    pub fn fail_transaction(&mut self) {
1070        let session = self.session.take().expect("session invariant violated");
1071        let session = session.fail_transaction();
1072        self.session = Some(session);
1073    }
1074
1075    /// Fetches the catalog.
1076    #[instrument(level = "debug")]
1077    pub async fn catalog_snapshot(&self, context: &str) -> Arc<Catalog> {
1078        let start = std::time::Instant::now();
1079        let CatalogSnapshot { catalog } = self
1080            .send_without_session(|tx| Command::CatalogSnapshot { tx })
1081            .await;
1082        self.inner()
1083            .metrics()
1084            .catalog_snapshot_seconds
1085            .with_label_values(&[context])
1086            .observe(start.elapsed().as_secs_f64());
1087        catalog
1088    }
1089
1090    /// Dumps the catalog to a JSON string.
1091    ///
1092    /// No authorization is performed, so access to this function must be limited to internal
1093    /// servers or superusers.
1094    pub async fn dump_catalog(&self) -> Result<CatalogDump, AdapterError> {
1095        let catalog = self.catalog_snapshot("dump_catalog").await;
1096        catalog.dump().map_err(AdapterError::from)
1097    }
1098
1099    /// Checks the catalog for internal consistency, returning a JSON object describing the
1100    /// inconsistencies, if there are any.
1101    ///
1102    /// No authorization is performed, so access to this function must be limited to internal
1103    /// servers or superusers.
1104    pub async fn check_catalog(&self) -> Result<(), serde_json::Value> {
1105        let catalog = self.catalog_snapshot("check_catalog").await;
1106        catalog.check_consistency()
1107    }
1108
1109    /// Checks the coordinator for internal consistency, returning a JSON object describing the
1110    /// inconsistencies, if there are any. This is a superset of checks that check_catalog performs,
1111    ///
1112    /// No authorization is performed, so access to this function must be limited to internal
1113    /// servers or superusers.
1114    pub async fn check_coordinator(&self) -> Result<(), serde_json::Value> {
1115        self.send_without_session(|tx| Command::CheckConsistency { tx })
1116            .await
1117            .map_err(|inconsistencies| {
1118                serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1119                    serde_json::Value::String("failed to serialize inconsistencies".to_string())
1120                })
1121            })
1122    }
1123
1124    pub async fn dump_coordinator_state(&self) -> Result<serde_json::Value, anyhow::Error> {
1125        self.send_without_session(|tx| Command::Dump { tx }).await
1126    }
1127
1128    /// Tells the coordinator a statement has finished execution, in the cases
1129    /// where we have no other reason to communicate with the coordinator.
1130    pub fn retire_execute(
1131        &self,
1132        guard: ExecuteContextGuard,
1133        reason: StatementEndedExecutionReason,
1134    ) {
1135        if !guard.is_trivial() {
1136            let data = guard.defuse();
1137            let cmd = Command::RetireExecute { data, reason };
1138            self.inner().send(cmd);
1139        }
1140    }
1141
1142    /// Sets up a streaming COPY FROM STDIN operation.
1143    ///
1144    /// Sends a command to the coordinator to create a background batch
1145    /// builder task. Returns a [`CopyFromStdinWriter`] that pgwire uses
1146    /// to stream decoded rows.
1147    pub async fn start_copy_from_stdin(
1148        &mut self,
1149        target_id: CatalogItemId,
1150        target_name: String,
1151        columns: Vec<ColumnIndex>,
1152        row_desc: mz_repr::RelationDesc,
1153        params: mz_pgcopy::CopyFormatParams<'static>,
1154    ) -> Result<CopyFromStdinWriter, AdapterError> {
1155        self.send(|tx, session| Command::StartCopyFromStdin {
1156            target_id,
1157            target_name,
1158            columns,
1159            row_desc,
1160            params,
1161            session,
1162            tx,
1163        })
1164        .await
1165    }
1166
1167    /// Commits staged COPY FROM STDIN batches to a table.
1168    ///
1169    /// Adds the pre-built persist batches to the session's transaction
1170    /// operations. The actual commit happens when the transaction ends.
1171    pub fn stage_copy_from_stdin_batches(
1172        &mut self,
1173        target_id: CatalogItemId,
1174        batches: Vec<mz_persist_client::batch::ProtoBatch>,
1175    ) -> Result<(), AdapterError> {
1176        use crate::session::{TransactionOps, WriteOp};
1177        use mz_storage_client::client::TableData;
1178
1179        self.session()
1180            .add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
1181                id: target_id,
1182                rows: TableData::Batches(batches.into()),
1183            }]))?;
1184        Ok(())
1185    }
1186
1187    /// Gets the current value of all system variables.
1188    pub async fn get_system_vars(&self) -> SystemVars {
1189        self.inner().get_system_vars().await
1190    }
1191
1192    /// Updates the specified system variables to the specified values.
1193    pub async fn set_system_vars(
1194        &mut self,
1195        vars: BTreeMap<String, String>,
1196    ) -> Result<(), AdapterError> {
1197        let conn_id = self.session().conn_id().clone();
1198        self.send_without_session(|tx| Command::SetSystemVars { vars, conn_id, tx })
1199            .await
1200    }
1201
1202    /// Injects audit events into the catalog via the coordinator.
1203    ///
1204    /// No authorization is performed, so access to this function must be limited to internal
1205    /// servers or superusers.
1206    pub async fn inject_audit_events(
1207        &mut self,
1208        events: Vec<crate::catalog::InjectedAuditEvent>,
1209    ) -> Result<(), AdapterError> {
1210        let conn_id = self.session().conn_id().clone();
1211        self.send_without_session(|tx| Command::InjectAuditEvents {
1212            events,
1213            conn_id,
1214            tx,
1215        })
1216        .await
1217    }
1218
1219    /// Terminates the client session.
1220    pub async fn terminate(&mut self) {
1221        let conn_id = self.session().conn_id().clone();
1222        let res = self
1223            .send_without_session(|tx| Command::Terminate {
1224                conn_id,
1225                tx: Some(tx),
1226            })
1227            .await;
1228        if let Err(e) = res {
1229            // Nothing we can do to handle a failed terminate so we just log and ignore it.
1230            error!("Unable to terminate session: {e:?}");
1231        }
1232        // Prevent any communication with Coordinator after session is terminated.
1233        self.inner = None;
1234    }
1235
1236    /// Returns a mutable reference to the session bound to this client.
1237    pub fn session(&mut self) -> &mut Session {
1238        self.session.as_mut().expect("session invariant violated")
1239    }
1240
1241    /// Returns a reference to the inner client.
1242    pub fn inner(&self) -> &Client {
1243        self.inner.as_ref().expect("inner invariant violated")
1244    }
1245
1246    async fn send_without_session<T, F>(&self, f: F) -> T
1247    where
1248        F: FnOnce(oneshot::Sender<T>) -> Command,
1249    {
1250        let (tx, rx) = oneshot::channel();
1251        self.inner().send(f(tx));
1252        rx.await.expect("sender dropped")
1253    }
1254
1255    #[instrument(level = "debug")]
1256    async fn send<T, F>(&mut self, f: F) -> Result<T, AdapterError>
1257    where
1258        F: FnOnce(oneshot::Sender<Response<T>>, Session) -> Command,
1259    {
1260        self.send_with_cancel(f, futures::future::pending()).await
1261    }
1262
1263    /// Send a [`Command`] to the Coordinator, with the ability to cancel the command.
1264    ///
1265    /// Note: the provided `cancel_future` must be cancel-safe as it's polled in a `select!` loop.
1266    #[instrument(level = "debug")]
1267    async fn send_with_cancel<T, F>(
1268        &mut self,
1269        f: F,
1270        cancel_future: impl Future<Output = std::io::Error> + Send,
1271    ) -> Result<T, AdapterError>
1272    where
1273        F: FnOnce(oneshot::Sender<Response<T>>, Session) -> Command,
1274    {
1275        let session = self.session.take().expect("session invariant violated");
1276        let mut typ = None;
1277        let application_name = session.application_name();
1278        let name_hint = ApplicationNameHint::from_str(application_name);
1279        let conn_id = session.conn_id().clone();
1280        let (tx, rx) = oneshot::channel();
1281
1282        // Destructure self so we can hold a mutable reference to the inner client and session at
1283        // the same time.
1284        let Self {
1285            inner: inner_client,
1286            session: client_session,
1287            ..
1288        } = self;
1289
1290        // TODO(parkmycar): Leaking this invariant here doesn't feel great, but calling
1291        // `self.client()` doesn't work because then Rust takes a borrow on the entirity of self.
1292        let inner_client = inner_client.as_ref().expect("inner invariant violated");
1293
1294        // ~~SPOOKY ZONE~~
1295        //
1296        // This guard prevents a race where a `Session` is returned on `rx` but never placed
1297        // back in `self` because the Future returned by this function is concurrently dropped
1298        // with the Coordinator sending a response.
1299        let mut guarded_rx = rx.with_guard(|response: Response<_>| {
1300            *client_session = Some(response.session);
1301        });
1302
1303        inner_client.send({
1304            let cmd = f(tx, session);
1305            // Measure the success and error rate of certain commands:
1306            // - declare reports success of SQL statement planning
1307            // - execute reports success of dataflow execution
1308            match cmd {
1309                Command::Execute { .. } => typ = Some("execute"),
1310                Command::GetWebhook { .. } => typ = Some("webhook"),
1311                Command::StartCopyFromStdin { .. }
1312                | Command::Startup { .. }
1313                | Command::AuthenticatePassword { .. }
1314                | Command::AuthenticateGetSASLChallenge { .. }
1315                | Command::AuthenticateVerifySASLProof { .. }
1316                | Command::CheckRoleCanLogin { .. }
1317                | Command::CatalogSnapshot { .. }
1318                | Command::Commit { .. }
1319                | Command::CancelRequest { .. }
1320                | Command::PrivilegedCancelRequest { .. }
1321                | Command::GetSystemVars { .. }
1322                | Command::SetSystemVars { .. }
1323                | Command::Terminate { .. }
1324                | Command::RetireExecute { .. }
1325                | Command::CheckConsistency { .. }
1326                | Command::Dump { .. }
1327                | Command::GetComputeInstanceClient { .. }
1328                | Command::GetOracle { .. }
1329                | Command::DetermineRealTimeRecentTimestamp { .. }
1330                | Command::GetTransactionReadHoldsBundle { .. }
1331                | Command::StoreTransactionReadHolds { .. }
1332                | Command::ExecuteSlowPathPeek { .. }
1333                | Command::ExecuteSubscribe { .. }
1334                | Command::CopyToPreflight { .. }
1335                | Command::ExecuteCopyTo { .. }
1336                | Command::ExecuteSideEffectingFunc { .. }
1337                | Command::RegisterFrontendPeek { .. }
1338                | Command::UnregisterFrontendPeek { .. }
1339                | Command::ExplainTimestamp { .. }
1340                | Command::FrontendStatementLogging(..)
1341                | Command::InjectAuditEvents { .. } => {}
1342            };
1343            cmd
1344        });
1345
1346        let mut cancel_future = pin::pin!(cancel_future);
1347        let mut cancelled = false;
1348        loop {
1349            tokio::select! {
1350                res = &mut guarded_rx => {
1351                    // We received a result, so drop our guard to drop our borrows.
1352                    drop(guarded_rx);
1353
1354                    let res = res.expect("sender dropped");
1355                    let status = res.result.is_ok().then_some("success").unwrap_or("error");
1356                    if let Err(err) = res.result.as_ref() {
1357                        if name_hint.should_trace_errors() {
1358                            tracing::warn!(?err, ?name_hint, "adapter response error");
1359                        }
1360                    }
1361
1362                    if let Some(typ) = typ {
1363                        inner_client
1364                            .metrics
1365                            .commands
1366                            .with_label_values(&[typ, status, name_hint.as_str()])
1367                            .inc();
1368                    }
1369                    *client_session = Some(res.session);
1370                    return res.result;
1371                },
1372                _err = &mut cancel_future, if !cancelled => {
1373                    cancelled = true;
1374                    inner_client.send(Command::PrivilegedCancelRequest {
1375                        conn_id: conn_id.clone(),
1376                    });
1377                }
1378            };
1379        }
1380    }
1381
1382    pub fn add_idle_in_transaction_session_timeout(&mut self) {
1383        let session = self.session();
1384        let timeout_dur = session.vars().idle_in_transaction_session_timeout();
1385        if !timeout_dur.is_zero() {
1386            let timeout_dur = timeout_dur.clone();
1387            if let Some(txn) = session.transaction().inner() {
1388                let txn_id = txn.id.clone();
1389                let timeout = TimeoutType::IdleInTransactionSession(txn_id);
1390                self.timeouts.add_timeout(timeout, timeout_dur);
1391            }
1392        }
1393    }
1394
1395    pub fn remove_idle_in_transaction_session_timeout(&mut self) {
1396        let session = self.session();
1397        if let Some(txn) = session.transaction().inner() {
1398            let txn_id = txn.id.clone();
1399            self.timeouts
1400                .remove_timeout(&TimeoutType::IdleInTransactionSession(txn_id));
1401        }
1402    }
1403
1404    /// # Cancel safety
1405    ///
1406    /// This method is cancel safe. If `recv` is used as the event in a
1407    /// `tokio::select!` statement and some other branch
1408    /// completes first, it is guaranteed that no messages were received on this
1409    /// channel.
1410    pub async fn recv_timeout(&mut self) -> Option<TimeoutType> {
1411        self.timeouts.recv().await
1412    }
1413
1414    /// Attempt to sequence a peek from the session task.
1415    ///
1416    /// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the
1417    /// Coordinator's sequencing. If it returns an error, it should be returned to the user.
1418    ///
1419    /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
1420    /// triggering the execution of the underlying query.
1421    pub(crate) async fn try_frontend_peek(
1422        &mut self,
1423        portal_name: &str,
1424        catalog: Option<Arc<Catalog>>,
1425        outer_ctx_extra: &mut Option<ExecuteContextGuard>,
1426    ) -> Result<Option<ExecuteResponse>, AdapterError> {
1427        if self.enable_frontend_peek_sequencing {
1428            let session = self.session.as_mut().expect("SessionClient invariant");
1429            self.peek_client
1430                .try_frontend_peek(portal_name, catalog, session, outer_ctx_extra)
1431                .await
1432        } else {
1433            Ok(None)
1434        }
1435    }
1436}
1437
1438impl Drop for SessionClient {
1439    fn drop(&mut self) {
1440        // We may not have a session if this client was dropped while awaiting
1441        // a response. In this case, it is the coordinator's responsibility to
1442        // terminate the session.
1443        if let Some(session) = self.session.take() {
1444            // We may not have a connection to the Coordinator if the session was
1445            // prematurely terminated, for example due to a timeout.
1446            if let Some(inner) = &self.inner {
1447                inner.send(Command::Terminate {
1448                    conn_id: session.conn_id().clone(),
1449                    tx: None,
1450                })
1451            }
1452        }
1453    }
1454}
1455
1456#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug)]
1457pub enum TimeoutType {
1458    IdleInTransactionSession(TransactionId),
1459}
1460
1461impl Display for TimeoutType {
1462    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1463        match self {
1464            TimeoutType::IdleInTransactionSession(txn_id) => {
1465                writeln!(f, "Idle in transaction session for transaction '{txn_id}'")
1466            }
1467        }
1468    }
1469}
1470
1471impl From<TimeoutType> for AdapterError {
1472    fn from(timeout: TimeoutType) -> Self {
1473        match timeout {
1474            TimeoutType::IdleInTransactionSession(_) => {
1475                AdapterError::IdleInTransactionSessionTimeout
1476            }
1477        }
1478    }
1479}
1480
1481struct Timeout {
1482    tx: mpsc::UnboundedSender<TimeoutType>,
1483    rx: mpsc::UnboundedReceiver<TimeoutType>,
1484    active_timeouts: BTreeMap<TimeoutType, AbortOnDropHandle<()>>,
1485}
1486
1487impl Timeout {
1488    fn new() -> Self {
1489        let (tx, rx) = mpsc::unbounded_channel();
1490        Timeout {
1491            tx,
1492            rx,
1493            active_timeouts: BTreeMap::new(),
1494        }
1495    }
1496
1497    /// # Cancel safety
1498    ///
1499    /// This method is cancel safe. If `recv` is used as the event in a
1500    /// `tokio::select!` statement and some other branch
1501    /// completes first, it is guaranteed that no messages were received on this
1502    /// channel.
1503    ///
1504    /// <https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety>
1505    async fn recv(&mut self) -> Option<TimeoutType> {
1506        self.rx.recv().await
1507    }
1508
1509    fn add_timeout(&mut self, timeout: TimeoutType, duration: Duration) {
1510        let tx = self.tx.clone();
1511        let timeout_key = timeout.clone();
1512        let handle = mz_ore::task::spawn(|| format!("{timeout_key}"), async move {
1513            tokio::time::sleep(duration).await;
1514            let _ = tx.send(timeout);
1515        })
1516        .abort_on_drop();
1517        self.active_timeouts.insert(timeout_key, handle);
1518    }
1519
1520    fn remove_timeout(&mut self, timeout: &TimeoutType) {
1521        self.active_timeouts.remove(timeout);
1522
1523        // Remove the timeout from the rx queue if it exists.
1524        let mut timeouts = Vec::new();
1525        while let Ok(pending_timeout) = self.rx.try_recv() {
1526            if timeout != &pending_timeout {
1527                timeouts.push(pending_timeout);
1528            }
1529        }
1530        for pending_timeout in timeouts {
1531            self.tx.send(pending_timeout).expect("rx is in this struct");
1532        }
1533    }
1534}
1535
1536/// A wrapper around a Stream of PeekResponseUnary that records when it sees the
1537/// first row data in the given histogram. It also keeps track of whether we have already observed
1538/// the end of the underlying stream.
1539#[derive(Derivative)]
1540#[derivative(Debug)]
1541pub struct RecordFirstRowStream {
1542    /// The underlying stream of rows.
1543    #[derivative(Debug = "ignore")]
1544    pub rows: Box<dyn Stream<Item = PeekResponseUnary> + Unpin + Send + Sync>,
1545    /// The Instant when execution started.
1546    pub execute_started: Instant,
1547    /// The histogram where the time since `execute_started` will be recorded when we see the first
1548    /// row.
1549    pub time_to_first_row_seconds: Histogram,
1550    /// Whether we've seen any rows.
1551    pub saw_rows: bool,
1552    /// The Instant when we saw the first row.
1553    pub recorded_first_row_instant: Option<Instant>,
1554    /// Whether we have already observed the end of the underlying stream.
1555    pub no_more_rows: bool,
1556    /// Whether the first-to-last-byte metric has already been recorded for this stream.
1557    pub metric_recorded: bool,
1558}
1559
1560impl RecordFirstRowStream {
1561    /// Create a new [`RecordFirstRowStream`]
1562    pub fn new(
1563        rows: Box<dyn Stream<Item = PeekResponseUnary> + Unpin + Send + Sync>,
1564        execute_started: Instant,
1565        client: &SessionClient,
1566        instance_id: Option<ComputeInstanceId>,
1567        strategy: Option<StatementExecutionStrategy>,
1568    ) -> Self {
1569        let histogram = Self::histogram(client, instance_id, strategy);
1570        Self {
1571            rows,
1572            execute_started,
1573            time_to_first_row_seconds: histogram,
1574            saw_rows: false,
1575            recorded_first_row_instant: None,
1576            no_more_rows: false,
1577            metric_recorded: false,
1578        }
1579    }
1580
1581    fn histogram(
1582        client: &SessionClient,
1583        instance_id: Option<ComputeInstanceId>,
1584        strategy: Option<StatementExecutionStrategy>,
1585    ) -> Histogram {
1586        let isolation_level = *client
1587            .session
1588            .as_ref()
1589            .expect("session invariant")
1590            .vars()
1591            .transaction_isolation();
1592        let instance = match instance_id {
1593            Some(i) => Cow::Owned(i.to_string()),
1594            None => Cow::Borrowed("none"),
1595        };
1596        let strategy = match strategy {
1597            Some(s) => s.name(),
1598            None => "none",
1599        };
1600
1601        client
1602            .inner()
1603            .metrics()
1604            .time_to_first_row_seconds
1605            .with_label_values(&[instance.as_ref(), isolation_level.as_str(), strategy])
1606    }
1607
1608    /// If you want to match [`RecordFirstRowStream`]'s logic but don't need
1609    /// a UnboundedReceiver, you can tell it when to record an observation.
1610    pub fn record(
1611        execute_started: Instant,
1612        client: &SessionClient,
1613        instance_id: Option<ComputeInstanceId>,
1614        strategy: Option<StatementExecutionStrategy>,
1615    ) {
1616        Self::histogram(client, instance_id, strategy)
1617            .observe(execute_started.elapsed().as_secs_f64());
1618    }
1619
1620    pub async fn recv(&mut self) -> Option<PeekResponseUnary> {
1621        let msg = self.rows.next().await;
1622        if !self.saw_rows && matches!(msg, Some(PeekResponseUnary::Rows(_))) {
1623            self.saw_rows = true;
1624            self.time_to_first_row_seconds
1625                .observe(self.execute_started.elapsed().as_secs_f64());
1626            self.recorded_first_row_instant = Some(Instant::now());
1627        }
1628        if msg.is_none() {
1629            self.no_more_rows = true;
1630        }
1631        msg
1632    }
1633}