mz_adapter/
client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::BTreeMap;
12use std::fmt::{Debug, Display, Formatter};
13use std::future::Future;
14use std::pin::{self, Pin};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use anyhow::bail;
19use chrono::{DateTime, Utc};
20use derivative::Derivative;
21use futures::{Stream, StreamExt};
22use itertools::Itertools;
23use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
24use mz_auth::password::Password;
25use mz_build_info::BuildInfo;
26use mz_compute_types::ComputeInstanceId;
27use mz_ore::channel::OneshotReceiverExt;
28use mz_ore::collections::CollectionExt;
29use mz_ore::id_gen::{IdAllocator, IdAllocatorInnerBitSet, MAX_ORG_ID, org_id_conn_bits};
30use mz_ore::instrument;
31use mz_ore::now::{EpochMillis, NowFn, to_datetime};
32use mz_ore::result::ResultExt;
33use mz_ore::task::AbortOnDropHandle;
34use mz_ore::thread::JoinOnDropHandle;
35use mz_ore::tracing::OpenTelemetryContext;
36use mz_repr::{CatalogItemId, ColumnIndex, Row, SqlScalarType};
37use mz_sql::ast::{Raw, Statement};
38use mz_sql::catalog::{EnvironmentId, SessionCatalog};
39use mz_sql::session::hint::ApplicationNameHint;
40use mz_sql::session::metadata::SessionMetadata;
41use mz_sql::session::user::SUPPORT_USER;
42use mz_sql::session::vars::{
43    CLUSTER, ENABLE_FRONTEND_PEEK_SEQUENCING, OwnedVarInput, SystemVars, Var,
44};
45use mz_sql_parser::parser::{ParserStatementError, StatementParseResult};
46use prometheus::Histogram;
47use serde_json::json;
48use tokio::sync::{mpsc, oneshot};
49use tracing::{debug, error};
50use uuid::Uuid;
51
52use crate::catalog::Catalog;
53use crate::command::{
54    AuthResponse, CatalogDump, CatalogSnapshot, Command, ExecuteResponse, Response,
55    SASLChallengeResponse, SASLVerifyProofResponse,
56};
57use crate::coord::{Coordinator, ExecuteContextGuard};
58use crate::error::AdapterError;
59use crate::metrics::Metrics;
60use crate::optimize::dataflows::{EvalTime, ExprPrepStyle};
61use crate::optimize::{self, Optimize};
62use crate::session::{
63    EndTransactionAction, PreparedStatement, Session, SessionConfig, StateRevision, TransactionId,
64};
65use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
66use crate::telemetry::{self, EventDetails, SegmentClientExt, StatementFailureType};
67use crate::webhook::AppendWebhookResponse;
68use crate::{AdapterNotice, AppendWebhookError, PeekClient, PeekResponseUnary, StartupResponse};
69
70/// A handle to a running coordinator.
71///
72/// The coordinator runs on its own thread. Dropping the handle will wait for
73/// the coordinator's thread to exit, which will only occur after all
74/// outstanding [`Client`]s for the coordinator have dropped.
75pub struct Handle {
76    pub(crate) session_id: Uuid,
77    pub(crate) start_instant: Instant,
78    pub(crate) _thread: JoinOnDropHandle<()>,
79}
80
81impl Handle {
82    /// Returns the session ID associated with this coordinator.
83    ///
84    /// The session ID is generated on coordinator boot. It lasts for the
85    /// lifetime of the coordinator. Restarting the coordinator will result
86    /// in a new session ID.
87    pub fn session_id(&self) -> Uuid {
88        self.session_id
89    }
90
91    /// Returns the instant at which the coordinator booted.
92    pub fn start_instant(&self) -> Instant {
93        self.start_instant
94    }
95}
96
97/// A coordinator client.
98///
99/// A coordinator client is a simple handle to a communication channel with the
100/// coordinator. It can be cheaply cloned.
101///
102/// Clients keep the coordinator alive. The coordinator will not exit until all
103/// outstanding clients have dropped.
104#[derive(Debug, Clone)]
105pub struct Client {
106    build_info: &'static BuildInfo,
107    inner_cmd_tx: mpsc::UnboundedSender<(OpenTelemetryContext, Command)>,
108    id_alloc: IdAllocator<IdAllocatorInnerBitSet>,
109    now: NowFn,
110    metrics: Metrics,
111    environment_id: EnvironmentId,
112    segment_client: Option<mz_segment::Client>,
113}
114
115impl Client {
116    pub(crate) fn new(
117        build_info: &'static BuildInfo,
118        cmd_tx: mpsc::UnboundedSender<(OpenTelemetryContext, Command)>,
119        metrics: Metrics,
120        now: NowFn,
121        environment_id: EnvironmentId,
122        segment_client: Option<mz_segment::Client>,
123    ) -> Client {
124        // Connection ids are 32 bits and have 3 parts.
125        // 1. MSB bit is always 0 because these are interpreted as an i32, and it is possible some
126        //    driver will not handle a negative id since postgres has never produced one because it
127        //    uses process ids.
128        // 2. Next 12 bits are the lower 12 bits of the org id. This allows balancerd to route
129        //    incoming cancel messages to a subset of the environments.
130        // 3. Last 19 bits are random.
131        let env_lower = org_id_conn_bits(&environment_id.organization_id());
132        Client {
133            build_info,
134            inner_cmd_tx: cmd_tx,
135            id_alloc: IdAllocator::new(1, MAX_ORG_ID, env_lower),
136            now,
137            metrics,
138            environment_id,
139            segment_client,
140        }
141    }
142
143    /// Allocates a client for an incoming connection.
144    pub fn new_conn_id(&self) -> Result<ConnectionId, AdapterError> {
145        self.id_alloc.alloc().ok_or(AdapterError::IdExhaustionError)
146    }
147
148    /// Creates a new session associated with this client for the given user.
149    ///
150    /// It is the caller's responsibility to have authenticated the user.
151    pub fn new_session(&self, config: SessionConfig) -> Session {
152        // We use the system clock to determine when a session connected to Materialize. This is not
153        // intended to be 100% accurate and correct, so we don't burden the timestamp oracle with
154        // generating a more correct timestamp.
155        Session::new(self.build_info, config, self.metrics().session_metrics())
156    }
157
158    /// Preforms an authentication check for the given user.
159    pub async fn authenticate(
160        &self,
161        user: &String,
162        password: &Password,
163    ) -> Result<AuthResponse, AdapterError> {
164        let (tx, rx) = oneshot::channel();
165        self.send(Command::AuthenticatePassword {
166            role_name: user.to_string(),
167            password: Some(password.clone()),
168            tx,
169        });
170        let response = rx.await.expect("sender dropped")?;
171        Ok(response)
172    }
173
174    pub async fn generate_sasl_challenge(
175        &self,
176        user: &String,
177        client_nonce: &String,
178    ) -> Result<SASLChallengeResponse, AdapterError> {
179        let (tx, rx) = oneshot::channel();
180        self.send(Command::AuthenticateGetSASLChallenge {
181            role_name: user.to_string(),
182            nonce: client_nonce.to_string(),
183            tx,
184        });
185        let response = rx.await.expect("sender dropped")?;
186        Ok(response)
187    }
188
189    pub async fn verify_sasl_proof(
190        &self,
191        user: &String,
192        proof: &String,
193        nonce: &String,
194        mock_hash: &String,
195    ) -> Result<SASLVerifyProofResponse, AdapterError> {
196        let (tx, rx) = oneshot::channel();
197        self.send(Command::AuthenticateVerifySASLProof {
198            role_name: user.to_string(),
199            proof: proof.to_string(),
200            auth_message: nonce.to_string(),
201            mock_hash: mock_hash.to_string(),
202            tx,
203        });
204        let response = rx.await.expect("sender dropped")?;
205        Ok(response)
206    }
207
208    /// Upgrades this client to a session client.
209    ///
210    /// A session is a connection that has successfully negotiated parameters,
211    /// like the user. Most coordinator operations are available only after
212    /// upgrading a connection to a session.
213    ///
214    /// Returns a new client that is bound to the session and a response
215    /// containing various details about the startup.
216    #[mz_ore::instrument(level = "debug")]
217    pub async fn startup(&self, session: Session) -> Result<SessionClient, AdapterError> {
218        let user = session.user().clone();
219        let conn_id = session.conn_id().clone();
220        let secret_key = session.secret_key();
221        let uuid = session.uuid();
222        let client_ip = session.client_ip();
223        let application_name = session.application_name().into();
224        let notice_tx = session.retain_notice_transmitter();
225
226        let (tx, rx) = oneshot::channel();
227
228        // ~~SPOOKY ZONE~~
229        //
230        // This guard prevents a race where the startup command finishes, but the Future returned
231        // by this function is concurrently dropped, so we never create a `SessionClient` and thus
232        // never cleanup the initialized Session.
233        let rx = rx.with_guard(|_| {
234            self.send(Command::Terminate {
235                conn_id: conn_id.clone(),
236                tx: None,
237            });
238        });
239
240        self.send(Command::Startup {
241            tx,
242            user,
243            conn_id: conn_id.clone(),
244            secret_key,
245            uuid,
246            client_ip: client_ip.copied(),
247            application_name,
248            notice_tx,
249        });
250
251        // When startup fails, no need to call terminate (handle_startup does this). Delay creating
252        // the client until after startup to sidestep the panic in its `Drop` implementation.
253        let response = rx.await.expect("sender dropped")?;
254
255        // Create the client as soon as startup succeeds (before any await points) so its `Drop` can
256        // handle termination.
257        // Build the PeekClient with controller handles returned from startup.
258        let StartupResponse {
259            role_id,
260            write_notify,
261            session_defaults,
262            catalog,
263            storage_collections,
264            transient_id_gen,
265            optimizer_metrics,
266            persist_client,
267            statement_logging_frontend,
268        } = response;
269
270        let peek_client = PeekClient::new(
271            self.clone(),
272            storage_collections,
273            transient_id_gen,
274            optimizer_metrics,
275            persist_client,
276            statement_logging_frontend,
277        );
278
279        let mut client = SessionClient {
280            inner: Some(self.clone()),
281            session: Some(session),
282            timeouts: Timeout::new(),
283            environment_id: self.environment_id.clone(),
284            segment_client: self.segment_client.clone(),
285            peek_client,
286            enable_frontend_peek_sequencing: false, // initialized below, once we have a ConnCatalog
287        };
288
289        let session = client.session();
290        session.initialize_role_metadata(role_id);
291        let vars_mut = session.vars_mut();
292        for (name, val) in session_defaults {
293            if let Err(err) = vars_mut.set_default(&name, val.borrow()) {
294                // Note: erroring here is unexpected, but we don't want to panic if somehow our
295                // assumptions are wrong.
296                tracing::error!("failed to set peristed default, {err:?}");
297            }
298        }
299        session
300            .vars_mut()
301            .end_transaction(EndTransactionAction::Commit);
302
303        // Stash the future that notifies us of builtin table writes completing, we'll block on
304        // this future before allowing queries from this session against relevant relations.
305        //
306        // Note: We stash the future as opposed to waiting on it here to prevent blocking session
307        // creation on builtin table updates. This improves the latency for session creation and
308        // reduces scheduling load on any dataflows that read from these builtin relations, since
309        // it allows updates to be batched.
310        session.set_builtin_table_updates(write_notify);
311
312        let catalog = catalog.for_session(session);
313
314        let cluster_active = session.vars().cluster().to_string();
315        if session.vars().welcome_message() {
316            let cluster_info = if catalog.resolve_cluster(Some(&cluster_active)).is_err() {
317                format!("{cluster_active} (does not exist)")
318            } else {
319                cluster_active.to_string()
320            };
321
322            // Emit a welcome message, optimized for readability by humans using
323            // interactive tools. If you change the message, make sure that it
324            // formats nicely in both `psql` and the console's SQL shell.
325            session.add_notice(AdapterNotice::Welcome(format!(
326                "connected to Materialize v{}
327  Org ID: {}
328  Region: {}
329  User: {}
330  Cluster: {}
331  Database: {}
332  {}
333  Session UUID: {}
334
335Issue a SQL query to get started. Need help?
336  View documentation: https://materialize.com/s/docs
337  Join our Slack community: https://materialize.com/s/chat
338    ",
339                session.vars().build_info().semver_version(),
340                self.environment_id.organization_id(),
341                self.environment_id.region(),
342                session.vars().user().name,
343                cluster_info,
344                session.vars().database(),
345                match session.vars().search_path() {
346                    [schema] => format!("Schema: {}", schema),
347                    schemas => format!(
348                        "Search path: {}",
349                        schemas.iter().map(|id| id.to_string()).join(", ")
350                    ),
351                },
352                session.uuid(),
353            )));
354        }
355
356        if session.vars().current_object_missing_warnings() {
357            if catalog.active_database().is_none() {
358                let db = session.vars().database().into();
359                session.add_notice(AdapterNotice::UnknownSessionDatabase(db));
360            }
361        }
362
363        // Users stub their toe on their default cluster not existing, so we provide a notice to
364        // help guide them on what do to.
365        let cluster_var = session
366            .vars()
367            .inspect(CLUSTER.name())
368            .expect("cluster should exist");
369        if session.vars().current_object_missing_warnings()
370            && catalog.resolve_cluster(Some(&cluster_active)).is_err()
371        {
372            let cluster_notice = 'notice: {
373                if cluster_var.inspect_session_value().is_some() {
374                    break 'notice Some(AdapterNotice::DefaultClusterDoesNotExist {
375                        name: cluster_active,
376                        kind: "session",
377                        suggested_action: "Pick an extant cluster with SET CLUSTER = name. Run SHOW CLUSTERS to see available clusters.".into(),
378                    });
379                }
380
381                let role_default = catalog.get_role(catalog.active_role_id());
382                let role_cluster = match role_default.vars().get(CLUSTER.name()) {
383                    Some(OwnedVarInput::Flat(name)) => Some(name),
384                    None => None,
385                    // This is unexpected!
386                    Some(v @ OwnedVarInput::SqlSet(_)) => {
387                        tracing::warn!(?v, "SqlSet found for cluster Role Default");
388                        break 'notice None;
389                    }
390                };
391
392                let alter_role = "with `ALTER ROLE <role> SET cluster TO <cluster>;`";
393                match role_cluster {
394                    // If there is no default, suggest a Role default.
395                    None => Some(AdapterNotice::DefaultClusterDoesNotExist {
396                        name: cluster_active,
397                        kind: "system",
398                        suggested_action: format!(
399                            "Set a default cluster for the current role {alter_role}."
400                        ),
401                    }),
402                    // If the default does not exist, suggest to change it.
403                    Some(_) => Some(AdapterNotice::DefaultClusterDoesNotExist {
404                        name: cluster_active,
405                        kind: "role",
406                        suggested_action: format!(
407                            "Change the default cluster for the current role {alter_role}."
408                        ),
409                    }),
410                }
411            };
412
413            if let Some(notice) = cluster_notice {
414                session.add_notice(notice);
415            }
416        }
417
418        client.enable_frontend_peek_sequencing = ENABLE_FRONTEND_PEEK_SEQUENCING
419            .require(catalog.system_vars())
420            .is_ok();
421
422        Ok(client)
423    }
424
425    /// Cancels the query currently running on the specified connection.
426    pub fn cancel_request(&self, conn_id: ConnectionIdType, secret_key: u32) {
427        self.send(Command::CancelRequest {
428            conn_id,
429            secret_key,
430        });
431    }
432
433    /// Executes a single SQL statement that returns rows as the
434    /// `mz_support` user.
435    pub async fn support_execute_one(
436        &self,
437        sql: &str,
438    ) -> Result<Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send>>, anyhow::Error> {
439        // Connect to the coordinator.
440        let conn_id = self.new_conn_id()?;
441        let session = self.new_session(SessionConfig {
442            conn_id,
443            uuid: Uuid::new_v4(),
444            user: SUPPORT_USER.name.clone(),
445            client_ip: None,
446            external_metadata_rx: None,
447            internal_user_metadata: None,
448            helm_chart_version: None,
449        });
450        let mut session_client = self.startup(session).await?;
451
452        // Parse the SQL statement.
453        let stmts = mz_sql::parse::parse(sql)?;
454        if stmts.len() != 1 {
455            bail!("must supply exactly one query");
456        }
457        let StatementParseResult { ast: stmt, sql } = stmts.into_element();
458
459        const EMPTY_PORTAL: &str = "";
460        session_client.start_transaction(Some(1))?;
461        session_client
462            .declare(EMPTY_PORTAL.into(), stmt, sql.to_string())
463            .await?;
464
465        match session_client
466            .execute(EMPTY_PORTAL.into(), futures::future::pending(), None)
467            .await?
468        {
469            (ExecuteResponse::SendingRowsStreaming { mut rows, .. }, _) => {
470                // We have to only drop the session client _after_ we read the
471                // result. Otherwise the peek will get cancelled right when we
472                // drop the session client. So we wrap it up in an extra stream
473                // like this, which owns the client and can return it.
474                let owning_response_stream = async_stream::stream! {
475                    while let Some(rows) = rows.next().await {
476                        yield rows;
477                    }
478                    drop(session_client);
479                };
480                Ok(Box::pin(owning_response_stream))
481            }
482            r => bail!("unsupported response type: {r:?}"),
483        }
484    }
485
486    /// Returns the metrics associated with the adapter layer.
487    pub fn metrics(&self) -> &Metrics {
488        &self.metrics
489    }
490
491    /// The current time according to the [`Client`].
492    pub fn now(&self) -> DateTime<Utc> {
493        to_datetime((self.now)())
494    }
495
496    /// Get a metadata and a channel that can be used to append to a webhook source.
497    pub async fn get_webhook_appender(
498        &self,
499        database: String,
500        schema: String,
501        name: String,
502    ) -> Result<AppendWebhookResponse, AppendWebhookError> {
503        let (tx, rx) = oneshot::channel();
504
505        // Send our request.
506        self.send(Command::GetWebhook {
507            database,
508            schema,
509            name,
510            tx,
511        });
512
513        // Using our one shot channel to get the result, returning an error if the sender dropped.
514        let response = rx
515            .await
516            .map_err(|_| anyhow::anyhow!("failed to receive webhook response"))?;
517
518        response
519    }
520
521    /// Gets the current value of all system variables.
522    pub async fn get_system_vars(&self) -> SystemVars {
523        let (tx, rx) = oneshot::channel();
524        self.send(Command::GetSystemVars { tx });
525        rx.await.expect("coordinator unexpectedly gone")
526    }
527
528    #[instrument(level = "debug")]
529    pub(crate) fn send(&self, cmd: Command) {
530        self.inner_cmd_tx
531            .send((OpenTelemetryContext::obtain(), cmd))
532            .expect("coordinator unexpectedly gone");
533    }
534}
535
536/// A coordinator client that is bound to a connection.
537///
538/// See also [`Client`].
539pub struct SessionClient {
540    // Invariant: inner may only be `None` after the session has been terminated.
541    // Once the session is terminated, no communication to the Coordinator
542    // should be attempted.
543    inner: Option<Client>,
544    // Invariant: session may only be `None` during a method call. Every public
545    // method must ensure that `Session` is `Some` before it returns.
546    session: Option<Session>,
547    timeouts: Timeout,
548    segment_client: Option<mz_segment::Client>,
549    environment_id: EnvironmentId,
550    /// Client for frontend peek sequencing; populated at connection startup.
551    peek_client: PeekClient,
552    /// Whether frontend peek sequencing is enabled; initialized at connection startup.
553    // TODO(peek-seq): Currently, this is initialized only at session startup. We'll be able to
554    // check the actual feature flag value at every peek (without a Coordinator call) once we'll
555    // always have a catalog snapshot at hand.
556    pub enable_frontend_peek_sequencing: bool,
557}
558
559impl SessionClient {
560    /// Parses a SQL expression, reporting failures as a telemetry event if
561    /// possible.
562    pub fn parse<'a>(
563        &self,
564        sql: &'a str,
565    ) -> Result<Result<Vec<StatementParseResult<'a>>, ParserStatementError>, String> {
566        match mz_sql::parse::parse_with_limit(sql) {
567            Ok(Err(e)) => {
568                self.track_statement_parse_failure(&e);
569                Ok(Err(e))
570            }
571            r => r,
572        }
573    }
574
575    fn track_statement_parse_failure(&self, parse_error: &ParserStatementError) {
576        let session = self.session.as_ref().expect("session invariant violated");
577        let Some(user_id) = session.user().external_metadata.as_ref().map(|m| m.user_id) else {
578            return;
579        };
580        let Some(segment_client) = &self.segment_client else {
581            return;
582        };
583        let Some(statement_kind) = parse_error.statement else {
584            return;
585        };
586        let Some((action, object_type)) = telemetry::analyze_audited_statement(statement_kind)
587        else {
588            return;
589        };
590        let event_type = StatementFailureType::ParseFailure;
591        let event_name = format!(
592            "{} {} {}",
593            object_type.as_title_case(),
594            action.as_title_case(),
595            event_type.as_title_case(),
596        );
597        segment_client.environment_track(
598            &self.environment_id,
599            event_name,
600            json!({
601                "statement_kind": statement_kind,
602                "error": &parse_error.error,
603            }),
604            EventDetails {
605                user_id: Some(user_id),
606                application_name: Some(session.application_name()),
607                ..Default::default()
608            },
609        );
610    }
611
612    // Verify and return the named prepared statement. We need to verify each use
613    // to make sure the prepared statement is still safe to use.
614    pub async fn get_prepared_statement(
615        &mut self,
616        name: &str,
617    ) -> Result<&PreparedStatement, AdapterError> {
618        let catalog = self.catalog_snapshot("get_prepared_statement").await;
619        Coordinator::verify_prepared_statement(&catalog, self.session(), name)?;
620        Ok(self
621            .session()
622            .get_prepared_statement_unverified(name)
623            .expect("must exist"))
624    }
625
626    /// Saves the parsed statement as a prepared statement.
627    ///
628    /// The prepared statement is saved in the connection's [`crate::session::Session`]
629    /// under the specified name.
630    pub async fn prepare(
631        &mut self,
632        name: String,
633        stmt: Option<Statement<Raw>>,
634        sql: String,
635        param_types: Vec<Option<SqlScalarType>>,
636    ) -> Result<(), AdapterError> {
637        let catalog = self.catalog_snapshot("prepare").await;
638
639        // Note: This failpoint is used to simulate a request outliving the external connection
640        // that made it.
641        let mut async_pause = false;
642        (|| {
643            fail::fail_point!("async_prepare", |val| {
644                async_pause = val.map_or(false, |val| val.parse().unwrap_or(false))
645            });
646        })();
647        if async_pause {
648            tokio::time::sleep(Duration::from_secs(1)).await;
649        };
650
651        let desc = Coordinator::describe(&catalog, self.session(), stmt.clone(), param_types)?;
652        let now = self.now();
653        let state_revision = StateRevision {
654            catalog_revision: catalog.transient_revision(),
655            session_state_revision: self.session().state_revision(),
656        };
657        self.session()
658            .set_prepared_statement(name, stmt, sql, desc, state_revision, now);
659        Ok(())
660    }
661
662    /// Binds a statement to a portal.
663    #[mz_ore::instrument(level = "debug")]
664    pub async fn declare(
665        &mut self,
666        name: String,
667        stmt: Statement<Raw>,
668        sql: String,
669    ) -> Result<(), AdapterError> {
670        let catalog = self.catalog_snapshot("declare").await;
671        let param_types = vec![];
672        let desc =
673            Coordinator::describe(&catalog, self.session(), Some(stmt.clone()), param_types)?;
674        let params = vec![];
675        let result_formats = vec![mz_pgwire_common::Format::Text; desc.arity()];
676        let now = self.now();
677        let logging = self.session().mint_logging(sql, Some(&stmt), now);
678        let state_revision = StateRevision {
679            catalog_revision: catalog.transient_revision(),
680            session_state_revision: self.session().state_revision(),
681        };
682        self.session().set_portal(
683            name,
684            desc,
685            Some(stmt),
686            logging,
687            params,
688            result_formats,
689            state_revision,
690        )?;
691        Ok(())
692    }
693
694    /// Executes a previously-bound portal.
695    ///
696    /// Note: the provided `cancel_future` must be cancel-safe as it's polled in a `select!` loop.
697    ///
698    /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
699    /// triggering the execution of the underlying query.
700    #[mz_ore::instrument(level = "debug")]
701    pub async fn execute(
702        &mut self,
703        portal_name: String,
704        cancel_future: impl Future<Output = std::io::Error> + Send,
705        outer_ctx_extra: Option<ExecuteContextGuard>,
706    ) -> Result<(ExecuteResponse, Instant), AdapterError> {
707        let execute_started = Instant::now();
708
709        // Attempt peek sequencing in the session task.
710        // If unsupported, fall back to the Coordinator path.
711        // TODO(peek-seq): wire up cancel_future
712        let mut outer_ctx_extra = outer_ctx_extra;
713        if let Some(resp) = self
714            .try_frontend_peek(&portal_name, &mut outer_ctx_extra)
715            .await?
716        {
717            debug!("frontend peek succeeded");
718            // Frontend peek handled the execution and retired outer_ctx_extra if it existed.
719            // No additional work needed here.
720            return Ok((resp, execute_started));
721        } else {
722            debug!("frontend peek did not happen, falling back to `Command::Execute`");
723            // If we bailed out, outer_ctx_extra is still present (if it was originally).
724            // `Command::Execute` will handle it.
725            // (This is not true if we bailed out _after_ the frontend peek sequencing has already
726            // begun its own statement logging. That case would be a bug.)
727        }
728
729        let response = self
730            .send_with_cancel(
731                |tx, session| Command::Execute {
732                    portal_name,
733                    session,
734                    tx,
735                    outer_ctx_extra,
736                },
737                cancel_future,
738            )
739            .await?;
740        Ok((response, execute_started))
741    }
742
743    fn now(&self) -> EpochMillis {
744        (self.inner().now)()
745    }
746
747    fn now_datetime(&self) -> DateTime<Utc> {
748        to_datetime(self.now())
749    }
750
751    /// Starts a transaction based on implicit:
752    /// - `None`: InTransaction
753    /// - `Some(1)`: Started
754    /// - `Some(n > 1)`: InTransactionImplicit
755    /// - `Some(0)`: no change
756    pub fn start_transaction(&mut self, implicit: Option<usize>) -> Result<(), AdapterError> {
757        let now = self.now_datetime();
758        let session = self.session.as_mut().expect("session invariant violated");
759        let result = match implicit {
760            None => session.start_transaction(now, None, None),
761            Some(stmts) => {
762                session.start_transaction_implicit(now, stmts);
763                Ok(())
764            }
765        };
766        result
767    }
768
769    /// Ends a transaction. Even if an error is returned, guarantees that the transaction in the
770    /// session and Coordinator has cleared its state.
771    #[instrument(level = "debug")]
772    pub async fn end_transaction(
773        &mut self,
774        action: EndTransactionAction,
775    ) -> Result<ExecuteResponse, AdapterError> {
776        let res = self
777            .send(|tx, session| Command::Commit {
778                action,
779                session,
780                tx,
781            })
782            .await;
783        // Commit isn't guaranteed to set the session's state to anything specific, so clear it
784        // here. It's safe to ignore the returned `TransactionStatus` because that doesn't contain
785        // any data that the Coordinator must act on for correctness.
786        let _ = self.session().clear_transaction();
787        res
788    }
789
790    /// Fails a transaction.
791    pub fn fail_transaction(&mut self) {
792        let session = self.session.take().expect("session invariant violated");
793        let session = session.fail_transaction();
794        self.session = Some(session);
795    }
796
797    /// Fetches the catalog.
798    #[instrument(level = "debug")]
799    pub async fn catalog_snapshot(&self, context: &str) -> Arc<Catalog> {
800        let start = std::time::Instant::now();
801        let CatalogSnapshot { catalog } = self
802            .send_without_session(|tx| Command::CatalogSnapshot { tx })
803            .await;
804        self.inner()
805            .metrics()
806            .catalog_snapshot_seconds
807            .with_label_values(&[context])
808            .observe(start.elapsed().as_secs_f64());
809        catalog
810    }
811
812    /// Dumps the catalog to a JSON string.
813    ///
814    /// No authorization is performed, so access to this function must be limited to internal
815    /// servers or superusers.
816    pub async fn dump_catalog(&self) -> Result<CatalogDump, AdapterError> {
817        let catalog = self.catalog_snapshot("dump_catalog").await;
818        catalog.dump().map_err(AdapterError::from)
819    }
820
821    /// Checks the catalog for internal consistency, returning a JSON object describing the
822    /// inconsistencies, if there are any.
823    ///
824    /// No authorization is performed, so access to this function must be limited to internal
825    /// servers or superusers.
826    pub async fn check_catalog(&self) -> Result<(), serde_json::Value> {
827        let catalog = self.catalog_snapshot("check_catalog").await;
828        catalog.check_consistency()
829    }
830
831    /// Checks the coordinator for internal consistency, returning a JSON object describing the
832    /// inconsistencies, if there are any. This is a superset of checks that check_catalog performs,
833    ///
834    /// No authorization is performed, so access to this function must be limited to internal
835    /// servers or superusers.
836    pub async fn check_coordinator(&self) -> Result<(), serde_json::Value> {
837        self.send_without_session(|tx| Command::CheckConsistency { tx })
838            .await
839            .map_err(|inconsistencies| {
840                serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
841                    serde_json::Value::String("failed to serialize inconsistencies".to_string())
842                })
843            })
844    }
845
846    pub async fn dump_coordinator_state(&self) -> Result<serde_json::Value, anyhow::Error> {
847        self.send_without_session(|tx| Command::Dump { tx }).await
848    }
849
850    /// Tells the coordinator a statement has finished execution, in the cases
851    /// where we have no other reason to communicate with the coordinator.
852    pub fn retire_execute(
853        &self,
854        guard: ExecuteContextGuard,
855        reason: StatementEndedExecutionReason,
856    ) {
857        if !guard.is_trivial() {
858            let data = guard.defuse();
859            let cmd = Command::RetireExecute { data, reason };
860            self.inner().send(cmd);
861        }
862    }
863
864    /// Inserts a set of rows into the given table.
865    ///
866    /// The rows only contain the columns positions in `columns`, so they
867    /// must be re-encoded for adding the default values for the remaining
868    /// ones.
869    pub async fn insert_rows(
870        &mut self,
871        target_id: CatalogItemId,
872        target_name: String,
873        columns: Vec<ColumnIndex>,
874        rows: Vec<Row>,
875        ctx_extra: ExecuteContextGuard,
876    ) -> Result<ExecuteResponse, AdapterError> {
877        // TODO: Remove this clone once we always have the session. It's currently needed because
878        // self.session returns a mut ref, so we can't call it twice.
879        let pcx = self.session().pcx().clone();
880
881        let session_meta = self.session().meta();
882
883        let catalog = self.catalog_snapshot("insert_rows").await;
884        let conn_catalog = catalog.for_session(self.session());
885        let catalog_state = conn_catalog.state();
886
887        // Collect optimizer parameters.
888        let optimizer_config = optimize::OptimizerConfig::from(conn_catalog.system_vars());
889        let prep = ExprPrepStyle::OneShot {
890            logical_time: EvalTime::NotAvailable,
891            session: &session_meta,
892            catalog_state,
893        };
894        let mut optimizer =
895            optimize::view::Optimizer::new_with_prep_no_limit(optimizer_config.clone(), None, prep);
896
897        let result: Result<_, AdapterError> = mz_sql::plan::plan_copy_from(
898            &pcx,
899            &conn_catalog,
900            target_id,
901            target_name,
902            columns,
903            rows,
904        )
905        .err_into()
906        .and_then(|values| optimizer.optimize(values).err_into())
907        .and_then(|values| {
908            // Copied rows must always be constants.
909            Coordinator::insert_constant(&catalog, self.session(), target_id, values.into_inner())
910        });
911        self.retire_execute(ctx_extra, (&result).into());
912        result
913    }
914
915    /// Gets the current value of all system variables.
916    pub async fn get_system_vars(&self) -> SystemVars {
917        self.inner().get_system_vars().await
918    }
919
920    /// Updates the specified system variables to the specified values.
921    pub async fn set_system_vars(
922        &mut self,
923        vars: BTreeMap<String, String>,
924    ) -> Result<(), AdapterError> {
925        let conn_id = self.session().conn_id().clone();
926        self.send_without_session(|tx| Command::SetSystemVars { vars, conn_id, tx })
927            .await
928    }
929
930    /// Terminates the client session.
931    pub async fn terminate(&mut self) {
932        let conn_id = self.session().conn_id().clone();
933        let res = self
934            .send_without_session(|tx| Command::Terminate {
935                conn_id,
936                tx: Some(tx),
937            })
938            .await;
939        if let Err(e) = res {
940            // Nothing we can do to handle a failed terminate so we just log and ignore it.
941            error!("Unable to terminate session: {e:?}");
942        }
943        // Prevent any communication with Coordinator after session is terminated.
944        self.inner = None;
945    }
946
947    /// Returns a mutable reference to the session bound to this client.
948    pub fn session(&mut self) -> &mut Session {
949        self.session.as_mut().expect("session invariant violated")
950    }
951
952    /// Returns a reference to the inner client.
953    pub fn inner(&self) -> &Client {
954        self.inner.as_ref().expect("inner invariant violated")
955    }
956
957    async fn send_without_session<T, F>(&self, f: F) -> T
958    where
959        F: FnOnce(oneshot::Sender<T>) -> Command,
960    {
961        let (tx, rx) = oneshot::channel();
962        self.inner().send(f(tx));
963        rx.await.expect("sender dropped")
964    }
965
966    #[instrument(level = "debug")]
967    async fn send<T, F>(&mut self, f: F) -> Result<T, AdapterError>
968    where
969        F: FnOnce(oneshot::Sender<Response<T>>, Session) -> Command,
970    {
971        self.send_with_cancel(f, futures::future::pending()).await
972    }
973
974    /// Send a [`Command`] to the Coordinator, with the ability to cancel the command.
975    ///
976    /// Note: the provided `cancel_future` must be cancel-safe as it's polled in a `select!` loop.
977    #[instrument(level = "debug")]
978    async fn send_with_cancel<T, F>(
979        &mut self,
980        f: F,
981        cancel_future: impl Future<Output = std::io::Error> + Send,
982    ) -> Result<T, AdapterError>
983    where
984        F: FnOnce(oneshot::Sender<Response<T>>, Session) -> Command,
985    {
986        let session = self.session.take().expect("session invariant violated");
987        let mut typ = None;
988        let application_name = session.application_name();
989        let name_hint = ApplicationNameHint::from_str(application_name);
990        let conn_id = session.conn_id().clone();
991        let (tx, rx) = oneshot::channel();
992
993        // Destructure self so we can hold a mutable reference to the inner client and session at
994        // the same time.
995        let Self {
996            inner: inner_client,
997            session: client_session,
998            ..
999        } = self;
1000
1001        // TODO(parkmycar): Leaking this invariant here doesn't feel great, but calling
1002        // `self.client()` doesn't work because then Rust takes a borrow on the entirity of self.
1003        let inner_client = inner_client.as_ref().expect("inner invariant violated");
1004
1005        // ~~SPOOKY ZONE~~
1006        //
1007        // This guard prevents a race where a `Session` is returned on `rx` but never placed
1008        // back in `self` because the Future returned by this function is concurrently dropped
1009        // with the Coordinator sending a response.
1010        let mut guarded_rx = rx.with_guard(|response: Response<_>| {
1011            *client_session = Some(response.session);
1012        });
1013
1014        inner_client.send({
1015            let cmd = f(tx, session);
1016            // Measure the success and error rate of certain commands:
1017            // - declare reports success of SQL statement planning
1018            // - execute reports success of dataflow execution
1019            match cmd {
1020                Command::Execute { .. } => typ = Some("execute"),
1021                Command::GetWebhook { .. } => typ = Some("webhook"),
1022                Command::Startup { .. }
1023                | Command::AuthenticatePassword { .. }
1024                | Command::AuthenticateGetSASLChallenge { .. }
1025                | Command::AuthenticateVerifySASLProof { .. }
1026                | Command::CatalogSnapshot { .. }
1027                | Command::Commit { .. }
1028                | Command::CancelRequest { .. }
1029                | Command::PrivilegedCancelRequest { .. }
1030                | Command::GetSystemVars { .. }
1031                | Command::SetSystemVars { .. }
1032                | Command::Terminate { .. }
1033                | Command::RetireExecute { .. }
1034                | Command::CheckConsistency { .. }
1035                | Command::Dump { .. }
1036                | Command::GetComputeInstanceClient { .. }
1037                | Command::GetOracle { .. }
1038                | Command::DetermineRealTimeRecentTimestamp { .. }
1039                | Command::GetTransactionReadHoldsBundle { .. }
1040                | Command::StoreTransactionReadHolds { .. }
1041                | Command::ExecuteSlowPathPeek { .. }
1042                | Command::CopyToPreflight { .. }
1043                | Command::ExecuteCopyTo { .. }
1044                | Command::ExecuteSideEffectingFunc { .. }
1045                | Command::RegisterFrontendPeek { .. }
1046                | Command::UnregisterFrontendPeek { .. }
1047                | Command::ExplainTimestamp { .. }
1048                | Command::FrontendStatementLogging(..) => {}
1049            };
1050            cmd
1051        });
1052
1053        let mut cancel_future = pin::pin!(cancel_future);
1054        let mut cancelled = false;
1055        loop {
1056            tokio::select! {
1057                res = &mut guarded_rx => {
1058                    // We received a result, so drop our guard to drop our borrows.
1059                    drop(guarded_rx);
1060
1061                    let res = res.expect("sender dropped");
1062                    let status = res.result.is_ok().then_some("success").unwrap_or("error");
1063                    if let Err(err) = res.result.as_ref() {
1064                        if name_hint.should_trace_errors() {
1065                            tracing::warn!(?err, ?name_hint, "adapter response error");
1066                        }
1067                    }
1068
1069                    if let Some(typ) = typ {
1070                        inner_client
1071                            .metrics
1072                            .commands
1073                            .with_label_values(&[typ, status, name_hint.as_str()])
1074                            .inc();
1075                    }
1076                    *client_session = Some(res.session);
1077                    return res.result;
1078                },
1079                _err = &mut cancel_future, if !cancelled => {
1080                    cancelled = true;
1081                    inner_client.send(Command::PrivilegedCancelRequest {
1082                        conn_id: conn_id.clone(),
1083                    });
1084                }
1085            };
1086        }
1087    }
1088
1089    pub fn add_idle_in_transaction_session_timeout(&mut self) {
1090        let session = self.session();
1091        let timeout_dur = session.vars().idle_in_transaction_session_timeout();
1092        if !timeout_dur.is_zero() {
1093            let timeout_dur = timeout_dur.clone();
1094            if let Some(txn) = session.transaction().inner() {
1095                let txn_id = txn.id.clone();
1096                let timeout = TimeoutType::IdleInTransactionSession(txn_id);
1097                self.timeouts.add_timeout(timeout, timeout_dur);
1098            }
1099        }
1100    }
1101
1102    pub fn remove_idle_in_transaction_session_timeout(&mut self) {
1103        let session = self.session();
1104        if let Some(txn) = session.transaction().inner() {
1105            let txn_id = txn.id.clone();
1106            self.timeouts
1107                .remove_timeout(&TimeoutType::IdleInTransactionSession(txn_id));
1108        }
1109    }
1110
1111    /// # Cancel safety
1112    ///
1113    /// This method is cancel safe. If `recv` is used as the event in a
1114    /// `tokio::select!` statement and some other branch
1115    /// completes first, it is guaranteed that no messages were received on this
1116    /// channel.
1117    pub async fn recv_timeout(&mut self) -> Option<TimeoutType> {
1118        self.timeouts.recv().await
1119    }
1120
1121    /// Returns a reference to the PeekClient used for frontend peek sequencing.
1122    pub fn peek_client(&self) -> &PeekClient {
1123        &self.peek_client
1124    }
1125
1126    /// Returns a reference to the PeekClient used for frontend peek sequencing.
1127    pub fn peek_client_mut(&mut self) -> &mut PeekClient {
1128        &mut self.peek_client
1129    }
1130
1131    /// Attempt to sequence a peek from the session task.
1132    ///
1133    /// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the
1134    /// Coordinator's sequencing. If it returns an error, it should be returned to the user.
1135    ///
1136    /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
1137    /// triggering the execution of the underlying query.
1138    pub(crate) async fn try_frontend_peek(
1139        &mut self,
1140        portal_name: &str,
1141        outer_ctx_extra: &mut Option<ExecuteContextGuard>,
1142    ) -> Result<Option<ExecuteResponse>, AdapterError> {
1143        if self.enable_frontend_peek_sequencing {
1144            let session = self.session.as_mut().expect("SessionClient invariant");
1145            self.peek_client
1146                .try_frontend_peek(portal_name, session, outer_ctx_extra)
1147                .await
1148        } else {
1149            Ok(None)
1150        }
1151    }
1152}
1153
1154impl Drop for SessionClient {
1155    fn drop(&mut self) {
1156        // We may not have a session if this client was dropped while awaiting
1157        // a response. In this case, it is the coordinator's responsibility to
1158        // terminate the session.
1159        if let Some(session) = self.session.take() {
1160            // We may not have a connection to the Coordinator if the session was
1161            // prematurely terminated, for example due to a timeout.
1162            if let Some(inner) = &self.inner {
1163                inner.send(Command::Terminate {
1164                    conn_id: session.conn_id().clone(),
1165                    tx: None,
1166                })
1167            }
1168        }
1169    }
1170}
1171
1172#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug)]
1173pub enum TimeoutType {
1174    IdleInTransactionSession(TransactionId),
1175}
1176
1177impl Display for TimeoutType {
1178    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1179        match self {
1180            TimeoutType::IdleInTransactionSession(txn_id) => {
1181                writeln!(f, "Idle in transaction session for transaction '{txn_id}'")
1182            }
1183        }
1184    }
1185}
1186
1187impl From<TimeoutType> for AdapterError {
1188    fn from(timeout: TimeoutType) -> Self {
1189        match timeout {
1190            TimeoutType::IdleInTransactionSession(_) => {
1191                AdapterError::IdleInTransactionSessionTimeout
1192            }
1193        }
1194    }
1195}
1196
1197struct Timeout {
1198    tx: mpsc::UnboundedSender<TimeoutType>,
1199    rx: mpsc::UnboundedReceiver<TimeoutType>,
1200    active_timeouts: BTreeMap<TimeoutType, AbortOnDropHandle<()>>,
1201}
1202
1203impl Timeout {
1204    fn new() -> Self {
1205        let (tx, rx) = mpsc::unbounded_channel();
1206        Timeout {
1207            tx,
1208            rx,
1209            active_timeouts: BTreeMap::new(),
1210        }
1211    }
1212
1213    /// # Cancel safety
1214    ///
1215    /// This method is cancel safe. If `recv` is used as the event in a
1216    /// `tokio::select!` statement and some other branch
1217    /// completes first, it is guaranteed that no messages were received on this
1218    /// channel.
1219    ///
1220    /// <https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety>
1221    async fn recv(&mut self) -> Option<TimeoutType> {
1222        self.rx.recv().await
1223    }
1224
1225    fn add_timeout(&mut self, timeout: TimeoutType, duration: Duration) {
1226        let tx = self.tx.clone();
1227        let timeout_key = timeout.clone();
1228        let handle = mz_ore::task::spawn(|| format!("{timeout_key}"), async move {
1229            tokio::time::sleep(duration).await;
1230            let _ = tx.send(timeout);
1231        })
1232        .abort_on_drop();
1233        self.active_timeouts.insert(timeout_key, handle);
1234    }
1235
1236    fn remove_timeout(&mut self, timeout: &TimeoutType) {
1237        self.active_timeouts.remove(timeout);
1238
1239        // Remove the timeout from the rx queue if it exists.
1240        let mut timeouts = Vec::new();
1241        while let Ok(pending_timeout) = self.rx.try_recv() {
1242            if timeout != &pending_timeout {
1243                timeouts.push(pending_timeout);
1244            }
1245        }
1246        for pending_timeout in timeouts {
1247            self.tx.send(pending_timeout).expect("rx is in this struct");
1248        }
1249    }
1250}
1251
1252/// A wrapper around a Stream of PeekResponseUnary that records when it sees the
1253/// first row data in the given histogram. It also keeps track of whether we have already observed
1254/// the end of the underlying stream.
1255#[derive(Derivative)]
1256#[derivative(Debug)]
1257pub struct RecordFirstRowStream {
1258    /// The underlying stream of rows.
1259    #[derivative(Debug = "ignore")]
1260    pub rows: Box<dyn Stream<Item = PeekResponseUnary> + Unpin + Send + Sync>,
1261    /// The Instant when execution started.
1262    pub execute_started: Instant,
1263    /// The histogram where the time since `execute_started` will be recorded when we see the first
1264    /// row.
1265    pub time_to_first_row_seconds: Histogram,
1266    /// Whether we've seen any rows.
1267    pub saw_rows: bool,
1268    /// The Instant when we saw the first row.
1269    pub recorded_first_row_instant: Option<Instant>,
1270    /// Whether we have already observed the end of the underlying stream.
1271    pub no_more_rows: bool,
1272}
1273
1274impl RecordFirstRowStream {
1275    /// Create a new [`RecordFirstRowStream`]
1276    pub fn new(
1277        rows: Box<dyn Stream<Item = PeekResponseUnary> + Unpin + Send + Sync>,
1278        execute_started: Instant,
1279        client: &SessionClient,
1280        instance_id: Option<ComputeInstanceId>,
1281        strategy: Option<StatementExecutionStrategy>,
1282    ) -> Self {
1283        let histogram = Self::histogram(client, instance_id, strategy);
1284        Self {
1285            rows,
1286            execute_started,
1287            time_to_first_row_seconds: histogram,
1288            saw_rows: false,
1289            recorded_first_row_instant: None,
1290            no_more_rows: false,
1291        }
1292    }
1293
1294    fn histogram(
1295        client: &SessionClient,
1296        instance_id: Option<ComputeInstanceId>,
1297        strategy: Option<StatementExecutionStrategy>,
1298    ) -> Histogram {
1299        let isolation_level = *client
1300            .session
1301            .as_ref()
1302            .expect("session invariant")
1303            .vars()
1304            .transaction_isolation();
1305        let instance = match instance_id {
1306            Some(i) => Cow::Owned(i.to_string()),
1307            None => Cow::Borrowed("none"),
1308        };
1309        let strategy = match strategy {
1310            Some(s) => s.name(),
1311            None => "none",
1312        };
1313
1314        client
1315            .inner()
1316            .metrics()
1317            .time_to_first_row_seconds
1318            .with_label_values(&[instance.as_ref(), isolation_level.as_str(), strategy])
1319    }
1320
1321    /// If you want to match [`RecordFirstRowStream`]'s logic but don't need
1322    /// a UnboundedReceiver, you can tell it when to record an observation.
1323    pub fn record(
1324        execute_started: Instant,
1325        client: &SessionClient,
1326        instance_id: Option<ComputeInstanceId>,
1327        strategy: Option<StatementExecutionStrategy>,
1328    ) {
1329        Self::histogram(client, instance_id, strategy)
1330            .observe(execute_started.elapsed().as_secs_f64());
1331    }
1332
1333    pub async fn recv(&mut self) -> Option<PeekResponseUnary> {
1334        let msg = self.rows.next().await;
1335        if !self.saw_rows && matches!(msg, Some(PeekResponseUnary::Rows(_))) {
1336            self.saw_rows = true;
1337            self.time_to_first_row_seconds
1338                .observe(self.execute_started.elapsed().as_secs_f64());
1339            self.recorded_first_row_instant = Some(Instant::now());
1340        }
1341        if msg.is_none() {
1342            self.no_more_rows = true;
1343        }
1344        msg
1345    }
1346}