Skip to main content

mz_adapter/
client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::borrow::Cow;
11use std::collections::BTreeMap;
12use std::fmt::{Debug, Display, Formatter};
13use std::future::Future;
14use std::pin::{self, Pin};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use anyhow::bail;
19use chrono::{DateTime, Utc};
20use derivative::Derivative;
21use futures::{Stream, StreamExt};
22use itertools::Itertools;
23use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
24use mz_auth::Authenticated;
25use mz_auth::password::Password;
26use mz_build_info::BuildInfo;
27use mz_compute_types::ComputeInstanceId;
28use mz_ore::channel::OneshotReceiverExt;
29use mz_ore::collections::CollectionExt;
30use mz_ore::id_gen::{IdAllocator, IdAllocatorInnerBitSet, MAX_ORG_ID, org_id_conn_bits};
31use mz_ore::instrument;
32use mz_ore::now::{EpochMillis, NowFn, to_datetime};
33use mz_ore::result::ResultExt;
34use mz_ore::task::AbortOnDropHandle;
35use mz_ore::thread::JoinOnDropHandle;
36use mz_ore::tracing::OpenTelemetryContext;
37use mz_repr::user::InternalUserMetadata;
38use mz_repr::{CatalogItemId, ColumnIndex, Row, SqlScalarType};
39use mz_sql::ast::{Raw, Statement};
40use mz_sql::catalog::{EnvironmentId, SessionCatalog};
41use mz_sql::session::hint::ApplicationNameHint;
42use mz_sql::session::metadata::SessionMetadata;
43use mz_sql::session::user::SUPPORT_USER;
44use mz_sql::session::vars::{
45    CLUSTER, ENABLE_FRONTEND_PEEK_SEQUENCING, OwnedVarInput, SystemVars, Var,
46};
47use mz_sql_parser::parser::{ParserStatementError, StatementParseResult};
48use prometheus::Histogram;
49use serde_json::json;
50use tokio::sync::{mpsc, oneshot};
51use tracing::{debug, error};
52use uuid::Uuid;
53
54use crate::catalog::Catalog;
55use crate::command::{
56    CatalogDump, CatalogSnapshot, Command, ExecuteResponse, Response, SASLChallengeResponse,
57    SASLVerifyProofResponse, SuperuserAttribute,
58};
59use crate::coord::{Coordinator, ExecuteContextGuard};
60use crate::error::AdapterError;
61use crate::metrics::Metrics;
62use crate::optimize::dataflows::{EvalTime, ExprPrepOneShot};
63use crate::optimize::{self, Optimize};
64use crate::session::{
65    EndTransactionAction, PreparedStatement, Session, SessionConfig, StateRevision, TransactionId,
66};
67use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
68use crate::telemetry::{self, EventDetails, SegmentClientExt, StatementFailureType};
69use crate::webhook::AppendWebhookResponse;
70use crate::{AdapterNotice, AppendWebhookError, PeekClient, PeekResponseUnary, StartupResponse};
71
72/// A handle to a running coordinator.
73///
74/// The coordinator runs on its own thread. Dropping the handle will wait for
75/// the coordinator's thread to exit, which will only occur after all
76/// outstanding [`Client`]s for the coordinator have dropped.
77pub struct Handle {
78    pub(crate) session_id: Uuid,
79    pub(crate) start_instant: Instant,
80    pub(crate) _thread: JoinOnDropHandle<()>,
81}
82
83impl Handle {
84    /// Returns the session ID associated with this coordinator.
85    ///
86    /// The session ID is generated on coordinator boot. It lasts for the
87    /// lifetime of the coordinator. Restarting the coordinator will result
88    /// in a new session ID.
89    pub fn session_id(&self) -> Uuid {
90        self.session_id
91    }
92
93    /// Returns the instant at which the coordinator booted.
94    pub fn start_instant(&self) -> Instant {
95        self.start_instant
96    }
97}
98
99/// A coordinator client.
100///
101/// A coordinator client is a simple handle to a communication channel with the
102/// coordinator. It can be cheaply cloned.
103///
104/// Clients keep the coordinator alive. The coordinator will not exit until all
105/// outstanding clients have dropped.
106#[derive(Debug, Clone)]
107pub struct Client {
108    build_info: &'static BuildInfo,
109    inner_cmd_tx: mpsc::UnboundedSender<(OpenTelemetryContext, Command)>,
110    id_alloc: IdAllocator<IdAllocatorInnerBitSet>,
111    now: NowFn,
112    metrics: Metrics,
113    environment_id: EnvironmentId,
114    segment_client: Option<mz_segment::Client>,
115}
116
117impl Client {
118    pub(crate) fn new(
119        build_info: &'static BuildInfo,
120        cmd_tx: mpsc::UnboundedSender<(OpenTelemetryContext, Command)>,
121        metrics: Metrics,
122        now: NowFn,
123        environment_id: EnvironmentId,
124        segment_client: Option<mz_segment::Client>,
125    ) -> Client {
126        // Connection ids are 32 bits and have 3 parts.
127        // 1. MSB bit is always 0 because these are interpreted as an i32, and it is possible some
128        //    driver will not handle a negative id since postgres has never produced one because it
129        //    uses process ids.
130        // 2. Next 12 bits are the lower 12 bits of the org id. This allows balancerd to route
131        //    incoming cancel messages to a subset of the environments.
132        // 3. Last 19 bits are random.
133        let env_lower = org_id_conn_bits(&environment_id.organization_id());
134        Client {
135            build_info,
136            inner_cmd_tx: cmd_tx,
137            id_alloc: IdAllocator::new(1, MAX_ORG_ID, env_lower),
138            now,
139            metrics,
140            environment_id,
141            segment_client,
142        }
143    }
144
145    /// Allocates a client for an incoming connection.
146    pub fn new_conn_id(&self) -> Result<ConnectionId, AdapterError> {
147        self.id_alloc.alloc().ok_or(AdapterError::IdExhaustionError)
148    }
149
150    /// Creates a new session associated with this client for the given user.
151    ///
152    /// It is the caller's responsibility to have authenticated the user.
153    /// We pass in an Authenticated marker as a guardrail to ensure the
154    /// user has authenticated with an authenticator before creating a session.
155    pub fn new_session(&self, config: SessionConfig, _authenticated: Authenticated) -> Session {
156        // We use the system clock to determine when a session connected to Materialize. This is not
157        // intended to be 100% accurate and correct, so we don't burden the timestamp oracle with
158        // generating a more correct timestamp.
159        Session::new(self.build_info, config, self.metrics().session_metrics())
160    }
161
162    /// Verifies the provided user's password against the
163    /// stored credentials in the catalog.
164    pub async fn authenticate(
165        &self,
166        user: &String,
167        password: &Password,
168    ) -> Result<Authenticated, AdapterError> {
169        let (tx, rx) = oneshot::channel();
170        self.send(Command::AuthenticatePassword {
171            role_name: user.to_string(),
172            password: Some(password.clone()),
173            tx,
174        });
175        rx.await.expect("sender dropped")?;
176        Ok(Authenticated)
177    }
178
179    pub async fn generate_sasl_challenge(
180        &self,
181        user: &String,
182        client_nonce: &String,
183    ) -> Result<SASLChallengeResponse, AdapterError> {
184        let (tx, rx) = oneshot::channel();
185        self.send(Command::AuthenticateGetSASLChallenge {
186            role_name: user.to_string(),
187            nonce: client_nonce.to_string(),
188            tx,
189        });
190        let response = rx.await.expect("sender dropped")?;
191        Ok(response)
192    }
193
194    pub async fn verify_sasl_proof(
195        &self,
196        user: &String,
197        proof: &String,
198        nonce: &String,
199        mock_hash: &String,
200    ) -> Result<(SASLVerifyProofResponse, Authenticated), AdapterError> {
201        let (tx, rx) = oneshot::channel();
202        self.send(Command::AuthenticateVerifySASLProof {
203            role_name: user.to_string(),
204            proof: proof.to_string(),
205            auth_message: nonce.to_string(),
206            mock_hash: mock_hash.to_string(),
207            tx,
208        });
209        let response = rx.await.expect("sender dropped")?;
210        Ok((response, Authenticated))
211    }
212
213    /// Upgrades this client to a session client.
214    ///
215    /// A session is a connection that has successfully negotiated parameters,
216    /// like the user. Most coordinator operations are available only after
217    /// upgrading a connection to a session.
218    ///
219    /// Returns a new client that is bound to the session and a response
220    /// containing various details about the startup.
221    #[mz_ore::instrument(level = "debug")]
222    pub async fn startup(&self, session: Session) -> Result<SessionClient, AdapterError> {
223        let user = session.user().clone();
224        let conn_id = session.conn_id().clone();
225        let secret_key = session.secret_key();
226        let uuid = session.uuid();
227        let client_ip = session.client_ip();
228        let application_name = session.application_name().into();
229        let notice_tx = session.retain_notice_transmitter();
230
231        let (tx, rx) = oneshot::channel();
232
233        // ~~SPOOKY ZONE~~
234        //
235        // This guard prevents a race where the startup command finishes, but the Future returned
236        // by this function is concurrently dropped, so we never create a `SessionClient` and thus
237        // never cleanup the initialized Session.
238        let rx = rx.with_guard(|_| {
239            self.send(Command::Terminate {
240                conn_id: conn_id.clone(),
241                tx: None,
242            });
243        });
244
245        self.send(Command::Startup {
246            tx,
247            user,
248            conn_id: conn_id.clone(),
249            secret_key,
250            uuid,
251            client_ip: client_ip.copied(),
252            application_name,
253            notice_tx,
254        });
255
256        // When startup fails, no need to call terminate (handle_startup does this). Delay creating
257        // the client until after startup to sidestep the panic in its `Drop` implementation.
258        let response = rx.await.expect("sender dropped")?;
259
260        // Create the client as soon as startup succeeds (before any await points) so its `Drop` can
261        // handle termination.
262        // Build the PeekClient with controller handles returned from startup.
263        let StartupResponse {
264            role_id,
265            write_notify,
266            session_defaults,
267            catalog,
268            storage_collections,
269            transient_id_gen,
270            optimizer_metrics,
271            persist_client,
272            statement_logging_frontend,
273            superuser_attribute,
274        } = response;
275
276        let peek_client = PeekClient::new(
277            self.clone(),
278            storage_collections,
279            transient_id_gen,
280            optimizer_metrics,
281            persist_client,
282            statement_logging_frontend,
283        );
284
285        let mut client = SessionClient {
286            inner: Some(self.clone()),
287            session: Some(session),
288            timeouts: Timeout::new(),
289            environment_id: self.environment_id.clone(),
290            segment_client: self.segment_client.clone(),
291            peek_client,
292            enable_frontend_peek_sequencing: false, // initialized below, once we have a ConnCatalog
293        };
294
295        let session = client.session();
296
297        // Apply the superuser attribute to the session's user if
298        // it exists.
299        if let SuperuserAttribute(Some(superuser)) = superuser_attribute {
300            session.apply_internal_user_metadata(InternalUserMetadata { superuser });
301        }
302
303        session.initialize_role_metadata(role_id);
304        let vars_mut = session.vars_mut();
305        for (name, val) in session_defaults {
306            if let Err(err) = vars_mut.set_default(&name, val.borrow()) {
307                // Note: erroring here is unexpected, but we don't want to panic if somehow our
308                // assumptions are wrong.
309                tracing::error!("failed to set peristed default, {err:?}");
310            }
311        }
312        session
313            .vars_mut()
314            .end_transaction(EndTransactionAction::Commit);
315
316        // Stash the future that notifies us of builtin table writes completing, we'll block on
317        // this future before allowing queries from this session against relevant relations.
318        //
319        // Note: We stash the future as opposed to waiting on it here to prevent blocking session
320        // creation on builtin table updates. This improves the latency for session creation and
321        // reduces scheduling load on any dataflows that read from these builtin relations, since
322        // it allows updates to be batched.
323        session.set_builtin_table_updates(write_notify);
324
325        let catalog = catalog.for_session(session);
326
327        let cluster_active = session.vars().cluster().to_string();
328        if session.vars().welcome_message() {
329            let cluster_info = if catalog.resolve_cluster(Some(&cluster_active)).is_err() {
330                format!("{cluster_active} (does not exist)")
331            } else {
332                cluster_active.to_string()
333            };
334
335            // Emit a welcome message, optimized for readability by humans using
336            // interactive tools. If you change the message, make sure that it
337            // formats nicely in both `psql` and the console's SQL shell.
338            session.add_notice(AdapterNotice::Welcome(format!(
339                "connected to Materialize v{}
340  Org ID: {}
341  Region: {}
342  User: {}
343  Cluster: {}
344  Database: {}
345  {}
346  Session UUID: {}
347
348Issue a SQL query to get started. Need help?
349  View documentation: https://materialize.com/s/docs
350  Join our Slack community: https://materialize.com/s/chat
351    ",
352                session.vars().build_info().semver_version(),
353                self.environment_id.organization_id(),
354                self.environment_id.region(),
355                session.vars().user().name,
356                cluster_info,
357                session.vars().database(),
358                match session.vars().search_path() {
359                    [schema] => format!("Schema: {}", schema),
360                    schemas => format!(
361                        "Search path: {}",
362                        schemas.iter().map(|id| id.to_string()).join(", ")
363                    ),
364                },
365                session.uuid(),
366            )));
367        }
368
369        if session.vars().current_object_missing_warnings() {
370            if catalog.active_database().is_none() {
371                let db = session.vars().database().into();
372                session.add_notice(AdapterNotice::UnknownSessionDatabase(db));
373            }
374        }
375
376        // Users stub their toe on their default cluster not existing, so we provide a notice to
377        // help guide them on what do to.
378        let cluster_var = session
379            .vars()
380            .inspect(CLUSTER.name())
381            .expect("cluster should exist");
382        if session.vars().current_object_missing_warnings()
383            && catalog.resolve_cluster(Some(&cluster_active)).is_err()
384        {
385            let cluster_notice = 'notice: {
386                if cluster_var.inspect_session_value().is_some() {
387                    break 'notice Some(AdapterNotice::DefaultClusterDoesNotExist {
388                        name: cluster_active,
389                        kind: "session",
390                        suggested_action: "Pick an extant cluster with SET CLUSTER = name. Run SHOW CLUSTERS to see available clusters.".into(),
391                    });
392                }
393
394                let role_default = catalog.get_role(catalog.active_role_id());
395                let role_cluster = match role_default.vars().get(CLUSTER.name()) {
396                    Some(OwnedVarInput::Flat(name)) => Some(name),
397                    None => None,
398                    // This is unexpected!
399                    Some(v @ OwnedVarInput::SqlSet(_)) => {
400                        tracing::warn!(?v, "SqlSet found for cluster Role Default");
401                        break 'notice None;
402                    }
403                };
404
405                let alter_role = "with `ALTER ROLE <role> SET cluster TO <cluster>;`";
406                match role_cluster {
407                    // If there is no default, suggest a Role default.
408                    None => Some(AdapterNotice::DefaultClusterDoesNotExist {
409                        name: cluster_active,
410                        kind: "system",
411                        suggested_action: format!(
412                            "Set a default cluster for the current role {alter_role}."
413                        ),
414                    }),
415                    // If the default does not exist, suggest to change it.
416                    Some(_) => Some(AdapterNotice::DefaultClusterDoesNotExist {
417                        name: cluster_active,
418                        kind: "role",
419                        suggested_action: format!(
420                            "Change the default cluster for the current role {alter_role}."
421                        ),
422                    }),
423                }
424            };
425
426            if let Some(notice) = cluster_notice {
427                session.add_notice(notice);
428            }
429        }
430
431        client.enable_frontend_peek_sequencing = ENABLE_FRONTEND_PEEK_SEQUENCING
432            .require(catalog.system_vars())
433            .is_ok();
434
435        Ok(client)
436    }
437
438    /// Cancels the query currently running on the specified connection.
439    pub fn cancel_request(&self, conn_id: ConnectionIdType, secret_key: u32) {
440        self.send(Command::CancelRequest {
441            conn_id,
442            secret_key,
443        });
444    }
445
446    /// Executes a single SQL statement that returns rows as the
447    /// `mz_support` user.
448    pub async fn support_execute_one(
449        &self,
450        sql: &str,
451    ) -> Result<Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send>>, anyhow::Error> {
452        // Connect to the coordinator.
453        let conn_id = self.new_conn_id()?;
454        let session = self.new_session(
455            SessionConfig {
456                conn_id,
457                uuid: Uuid::new_v4(),
458                user: SUPPORT_USER.name.clone(),
459                client_ip: None,
460                external_metadata_rx: None,
461                helm_chart_version: None,
462            },
463            Authenticated,
464        );
465        let mut session_client = self.startup(session).await?;
466
467        // Parse the SQL statement.
468        let stmts = mz_sql::parse::parse(sql)?;
469        if stmts.len() != 1 {
470            bail!("must supply exactly one query");
471        }
472        let StatementParseResult { ast: stmt, sql } = stmts.into_element();
473
474        const EMPTY_PORTAL: &str = "";
475        session_client.start_transaction(Some(1))?;
476        session_client
477            .declare(EMPTY_PORTAL.into(), stmt, sql.to_string())
478            .await?;
479
480        match session_client
481            .execute(EMPTY_PORTAL.into(), futures::future::pending(), None)
482            .await?
483        {
484            (ExecuteResponse::SendingRowsStreaming { mut rows, .. }, _) => {
485                // We have to only drop the session client _after_ we read the
486                // result. Otherwise the peek will get cancelled right when we
487                // drop the session client. So we wrap it up in an extra stream
488                // like this, which owns the client and can return it.
489                let owning_response_stream = async_stream::stream! {
490                    while let Some(rows) = rows.next().await {
491                        yield rows;
492                    }
493                    drop(session_client);
494                };
495                Ok(Box::pin(owning_response_stream))
496            }
497            r => bail!("unsupported response type: {r:?}"),
498        }
499    }
500
501    /// Returns the metrics associated with the adapter layer.
502    pub fn metrics(&self) -> &Metrics {
503        &self.metrics
504    }
505
506    /// The current time according to the [`Client`].
507    pub fn now(&self) -> DateTime<Utc> {
508        to_datetime((self.now)())
509    }
510
511    /// Get a metadata and a channel that can be used to append to a webhook source.
512    pub async fn get_webhook_appender(
513        &self,
514        database: String,
515        schema: String,
516        name: String,
517    ) -> Result<AppendWebhookResponse, AppendWebhookError> {
518        let (tx, rx) = oneshot::channel();
519
520        // Send our request.
521        self.send(Command::GetWebhook {
522            database,
523            schema,
524            name,
525            tx,
526        });
527
528        // Using our one shot channel to get the result, returning an error if the sender dropped.
529        let response = rx
530            .await
531            .map_err(|_| anyhow::anyhow!("failed to receive webhook response"))?;
532
533        response
534    }
535
536    /// Gets the current value of all system variables.
537    pub async fn get_system_vars(&self) -> SystemVars {
538        let (tx, rx) = oneshot::channel();
539        self.send(Command::GetSystemVars { tx });
540        rx.await.expect("coordinator unexpectedly gone")
541    }
542
543    #[instrument(level = "debug")]
544    pub(crate) fn send(&self, cmd: Command) {
545        self.inner_cmd_tx
546            .send((OpenTelemetryContext::obtain(), cmd))
547            .expect("coordinator unexpectedly gone");
548    }
549}
550
551/// A coordinator client that is bound to a connection.
552///
553/// See also [`Client`].
554pub struct SessionClient {
555    // Invariant: inner may only be `None` after the session has been terminated.
556    // Once the session is terminated, no communication to the Coordinator
557    // should be attempted.
558    inner: Option<Client>,
559    // Invariant: session may only be `None` during a method call. Every public
560    // method must ensure that `Session` is `Some` before it returns.
561    session: Option<Session>,
562    timeouts: Timeout,
563    segment_client: Option<mz_segment::Client>,
564    environment_id: EnvironmentId,
565    /// Client for frontend peek sequencing; populated at connection startup.
566    peek_client: PeekClient,
567    /// Whether frontend peek sequencing is enabled; initialized at connection startup.
568    // TODO(peek-seq): Currently, this is initialized only at session startup. We'll be able to
569    // check the actual feature flag value at every peek (without a Coordinator call) once we'll
570    // always have a catalog snapshot at hand.
571    pub enable_frontend_peek_sequencing: bool,
572}
573
574impl SessionClient {
575    /// Parses a SQL expression, reporting failures as a telemetry event if
576    /// possible.
577    pub fn parse<'a>(
578        &self,
579        sql: &'a str,
580    ) -> Result<Result<Vec<StatementParseResult<'a>>, ParserStatementError>, String> {
581        match mz_sql::parse::parse_with_limit(sql) {
582            Ok(Err(e)) => {
583                self.track_statement_parse_failure(&e);
584                Ok(Err(e))
585            }
586            r => r,
587        }
588    }
589
590    fn track_statement_parse_failure(&self, parse_error: &ParserStatementError) {
591        let session = self.session.as_ref().expect("session invariant violated");
592        let Some(user_id) = session.user().external_metadata.as_ref().map(|m| m.user_id) else {
593            return;
594        };
595        let Some(segment_client) = &self.segment_client else {
596            return;
597        };
598        let Some(statement_kind) = parse_error.statement else {
599            return;
600        };
601        let Some((action, object_type)) = telemetry::analyze_audited_statement(statement_kind)
602        else {
603            return;
604        };
605        let event_type = StatementFailureType::ParseFailure;
606        let event_name = format!(
607            "{} {} {}",
608            object_type.as_title_case(),
609            action.as_title_case(),
610            event_type.as_title_case(),
611        );
612        segment_client.environment_track(
613            &self.environment_id,
614            event_name,
615            json!({
616                "statement_kind": statement_kind,
617                "error": &parse_error.error,
618            }),
619            EventDetails {
620                user_id: Some(user_id),
621                application_name: Some(session.application_name()),
622                ..Default::default()
623            },
624        );
625    }
626
627    // Verify and return the named prepared statement. We need to verify each use
628    // to make sure the prepared statement is still safe to use.
629    pub async fn get_prepared_statement(
630        &mut self,
631        name: &str,
632    ) -> Result<&PreparedStatement, AdapterError> {
633        let catalog = self.catalog_snapshot("get_prepared_statement").await;
634        Coordinator::verify_prepared_statement(&catalog, self.session(), name)?;
635        Ok(self
636            .session()
637            .get_prepared_statement_unverified(name)
638            .expect("must exist"))
639    }
640
641    /// Saves the parsed statement as a prepared statement.
642    ///
643    /// The prepared statement is saved in the connection's [`crate::session::Session`]
644    /// under the specified name.
645    pub async fn prepare(
646        &mut self,
647        name: String,
648        stmt: Option<Statement<Raw>>,
649        sql: String,
650        param_types: Vec<Option<SqlScalarType>>,
651    ) -> Result<(), AdapterError> {
652        let catalog = self.catalog_snapshot("prepare").await;
653
654        // Note: This failpoint is used to simulate a request outliving the external connection
655        // that made it.
656        let mut async_pause = false;
657        (|| {
658            fail::fail_point!("async_prepare", |val| {
659                async_pause = val.map_or(false, |val| val.parse().unwrap_or(false))
660            });
661        })();
662        if async_pause {
663            tokio::time::sleep(Duration::from_secs(1)).await;
664        };
665
666        let desc = Coordinator::describe(&catalog, self.session(), stmt.clone(), param_types)?;
667        let now = self.now();
668        let state_revision = StateRevision {
669            catalog_revision: catalog.transient_revision(),
670            session_state_revision: self.session().state_revision(),
671        };
672        self.session()
673            .set_prepared_statement(name, stmt, sql, desc, state_revision, now);
674        Ok(())
675    }
676
677    /// Binds a statement to a portal.
678    #[mz_ore::instrument(level = "debug")]
679    pub async fn declare(
680        &mut self,
681        name: String,
682        stmt: Statement<Raw>,
683        sql: String,
684    ) -> Result<(), AdapterError> {
685        let catalog = self.catalog_snapshot("declare").await;
686        let param_types = vec![];
687        let desc =
688            Coordinator::describe(&catalog, self.session(), Some(stmt.clone()), param_types)?;
689        let params = vec![];
690        let result_formats = vec![mz_pgwire_common::Format::Text; desc.arity()];
691        let now = self.now();
692        let logging = self.session().mint_logging(sql, Some(&stmt), now);
693        let state_revision = StateRevision {
694            catalog_revision: catalog.transient_revision(),
695            session_state_revision: self.session().state_revision(),
696        };
697        self.session().set_portal(
698            name,
699            desc,
700            Some(stmt),
701            logging,
702            params,
703            result_formats,
704            state_revision,
705        )?;
706        Ok(())
707    }
708
709    /// Executes a previously-bound portal.
710    ///
711    /// Note: the provided `cancel_future` must be cancel-safe as it's polled in a `select!` loop.
712    ///
713    /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
714    /// triggering the execution of the underlying query.
715    #[mz_ore::instrument(level = "debug")]
716    pub async fn execute(
717        &mut self,
718        portal_name: String,
719        cancel_future: impl Future<Output = std::io::Error> + Send,
720        outer_ctx_extra: Option<ExecuteContextGuard>,
721    ) -> Result<(ExecuteResponse, Instant), AdapterError> {
722        let execute_started = Instant::now();
723
724        // Attempt peek sequencing in the session task.
725        // If unsupported, fall back to the Coordinator path.
726        // TODO(peek-seq): wire up cancel_future
727        let mut outer_ctx_extra = outer_ctx_extra;
728        if let Some(resp) = self
729            .try_frontend_peek(&portal_name, &mut outer_ctx_extra)
730            .await?
731        {
732            debug!("frontend peek succeeded");
733            // Frontend peek handled the execution and retired outer_ctx_extra if it existed.
734            // No additional work needed here.
735            return Ok((resp, execute_started));
736        } else {
737            debug!("frontend peek did not happen, falling back to `Command::Execute`");
738            // If we bailed out, outer_ctx_extra is still present (if it was originally).
739            // `Command::Execute` will handle it.
740            // (This is not true if we bailed out _after_ the frontend peek sequencing has already
741            // begun its own statement logging. That case would be a bug.)
742        }
743
744        let response = self
745            .send_with_cancel(
746                |tx, session| Command::Execute {
747                    portal_name,
748                    session,
749                    tx,
750                    outer_ctx_extra,
751                },
752                cancel_future,
753            )
754            .await?;
755        Ok((response, execute_started))
756    }
757
758    fn now(&self) -> EpochMillis {
759        (self.inner().now)()
760    }
761
762    fn now_datetime(&self) -> DateTime<Utc> {
763        to_datetime(self.now())
764    }
765
766    /// Starts a transaction based on implicit:
767    /// - `None`: InTransaction
768    /// - `Some(1)`: Started
769    /// - `Some(n > 1)`: InTransactionImplicit
770    /// - `Some(0)`: no change
771    pub fn start_transaction(&mut self, implicit: Option<usize>) -> Result<(), AdapterError> {
772        let now = self.now_datetime();
773        let session = self.session.as_mut().expect("session invariant violated");
774        let result = match implicit {
775            None => session.start_transaction(now, None, None),
776            Some(stmts) => {
777                session.start_transaction_implicit(now, stmts);
778                Ok(())
779            }
780        };
781        result
782    }
783
784    /// Ends a transaction. Even if an error is returned, guarantees that the transaction in the
785    /// session and Coordinator has cleared its state.
786    #[instrument(level = "debug")]
787    pub async fn end_transaction(
788        &mut self,
789        action: EndTransactionAction,
790    ) -> Result<ExecuteResponse, AdapterError> {
791        let res = self
792            .send(|tx, session| Command::Commit {
793                action,
794                session,
795                tx,
796            })
797            .await;
798        // Commit isn't guaranteed to set the session's state to anything specific, so clear it
799        // here. It's safe to ignore the returned `TransactionStatus` because that doesn't contain
800        // any data that the Coordinator must act on for correctness.
801        let _ = self.session().clear_transaction();
802        res
803    }
804
805    /// Fails a transaction.
806    pub fn fail_transaction(&mut self) {
807        let session = self.session.take().expect("session invariant violated");
808        let session = session.fail_transaction();
809        self.session = Some(session);
810    }
811
812    /// Fetches the catalog.
813    #[instrument(level = "debug")]
814    pub async fn catalog_snapshot(&self, context: &str) -> Arc<Catalog> {
815        let start = std::time::Instant::now();
816        let CatalogSnapshot { catalog } = self
817            .send_without_session(|tx| Command::CatalogSnapshot { tx })
818            .await;
819        self.inner()
820            .metrics()
821            .catalog_snapshot_seconds
822            .with_label_values(&[context])
823            .observe(start.elapsed().as_secs_f64());
824        catalog
825    }
826
827    /// Dumps the catalog to a JSON string.
828    ///
829    /// No authorization is performed, so access to this function must be limited to internal
830    /// servers or superusers.
831    pub async fn dump_catalog(&self) -> Result<CatalogDump, AdapterError> {
832        let catalog = self.catalog_snapshot("dump_catalog").await;
833        catalog.dump().map_err(AdapterError::from)
834    }
835
836    /// Checks the catalog for internal consistency, returning a JSON object describing the
837    /// inconsistencies, if there are any.
838    ///
839    /// No authorization is performed, so access to this function must be limited to internal
840    /// servers or superusers.
841    pub async fn check_catalog(&self) -> Result<(), serde_json::Value> {
842        let catalog = self.catalog_snapshot("check_catalog").await;
843        catalog.check_consistency()
844    }
845
846    /// Checks the coordinator for internal consistency, returning a JSON object describing the
847    /// inconsistencies, if there are any. This is a superset of checks that check_catalog performs,
848    ///
849    /// No authorization is performed, so access to this function must be limited to internal
850    /// servers or superusers.
851    pub async fn check_coordinator(&self) -> Result<(), serde_json::Value> {
852        self.send_without_session(|tx| Command::CheckConsistency { tx })
853            .await
854            .map_err(|inconsistencies| {
855                serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
856                    serde_json::Value::String("failed to serialize inconsistencies".to_string())
857                })
858            })
859    }
860
861    pub async fn dump_coordinator_state(&self) -> Result<serde_json::Value, anyhow::Error> {
862        self.send_without_session(|tx| Command::Dump { tx }).await
863    }
864
865    /// Tells the coordinator a statement has finished execution, in the cases
866    /// where we have no other reason to communicate with the coordinator.
867    pub fn retire_execute(
868        &self,
869        guard: ExecuteContextGuard,
870        reason: StatementEndedExecutionReason,
871    ) {
872        if !guard.is_trivial() {
873            let data = guard.defuse();
874            let cmd = Command::RetireExecute { data, reason };
875            self.inner().send(cmd);
876        }
877    }
878
879    /// Inserts a set of rows into the given table.
880    ///
881    /// The rows only contain the columns positions in `columns`, so they
882    /// must be re-encoded for adding the default values for the remaining
883    /// ones.
884    pub async fn insert_rows(
885        &mut self,
886        target_id: CatalogItemId,
887        target_name: String,
888        columns: Vec<ColumnIndex>,
889        rows: Vec<Row>,
890        ctx_extra: ExecuteContextGuard,
891    ) -> Result<ExecuteResponse, AdapterError> {
892        // TODO: Remove this clone once we always have the session. It's currently needed because
893        // self.session returns a mut ref, so we can't call it twice.
894        let pcx = self.session().pcx().clone();
895
896        let session_meta = self.session().meta();
897
898        let catalog = self.catalog_snapshot("insert_rows").await;
899        let conn_catalog = catalog.for_session(self.session());
900        let catalog_state = conn_catalog.state();
901
902        // Collect optimizer parameters.
903        let optimizer_config = optimize::OptimizerConfig::from(conn_catalog.system_vars());
904        let prep = ExprPrepOneShot {
905            logical_time: EvalTime::NotAvailable,
906            session: &session_meta,
907            catalog_state,
908        };
909        let mut optimizer =
910            optimize::view::Optimizer::new_with_prep_no_limit(optimizer_config.clone(), None, prep);
911
912        let result: Result<_, AdapterError> = mz_sql::plan::plan_copy_from(
913            &pcx,
914            &conn_catalog,
915            target_id,
916            target_name,
917            columns,
918            rows,
919        )
920        .err_into()
921        .and_then(|values| optimizer.optimize(values).err_into())
922        .and_then(|values| {
923            // Copied rows must always be constants.
924            Coordinator::insert_constant(&catalog, self.session(), target_id, values.into_inner())
925        });
926        self.retire_execute(ctx_extra, (&result).into());
927        result
928    }
929
930    /// Gets the current value of all system variables.
931    pub async fn get_system_vars(&self) -> SystemVars {
932        self.inner().get_system_vars().await
933    }
934
935    /// Updates the specified system variables to the specified values.
936    pub async fn set_system_vars(
937        &mut self,
938        vars: BTreeMap<String, String>,
939    ) -> Result<(), AdapterError> {
940        let conn_id = self.session().conn_id().clone();
941        self.send_without_session(|tx| Command::SetSystemVars { vars, conn_id, tx })
942            .await
943    }
944
945    /// Terminates the client session.
946    pub async fn terminate(&mut self) {
947        let conn_id = self.session().conn_id().clone();
948        let res = self
949            .send_without_session(|tx| Command::Terminate {
950                conn_id,
951                tx: Some(tx),
952            })
953            .await;
954        if let Err(e) = res {
955            // Nothing we can do to handle a failed terminate so we just log and ignore it.
956            error!("Unable to terminate session: {e:?}");
957        }
958        // Prevent any communication with Coordinator after session is terminated.
959        self.inner = None;
960    }
961
962    /// Returns a mutable reference to the session bound to this client.
963    pub fn session(&mut self) -> &mut Session {
964        self.session.as_mut().expect("session invariant violated")
965    }
966
967    /// Returns a reference to the inner client.
968    pub fn inner(&self) -> &Client {
969        self.inner.as_ref().expect("inner invariant violated")
970    }
971
972    async fn send_without_session<T, F>(&self, f: F) -> T
973    where
974        F: FnOnce(oneshot::Sender<T>) -> Command,
975    {
976        let (tx, rx) = oneshot::channel();
977        self.inner().send(f(tx));
978        rx.await.expect("sender dropped")
979    }
980
981    #[instrument(level = "debug")]
982    async fn send<T, F>(&mut self, f: F) -> Result<T, AdapterError>
983    where
984        F: FnOnce(oneshot::Sender<Response<T>>, Session) -> Command,
985    {
986        self.send_with_cancel(f, futures::future::pending()).await
987    }
988
989    /// Send a [`Command`] to the Coordinator, with the ability to cancel the command.
990    ///
991    /// Note: the provided `cancel_future` must be cancel-safe as it's polled in a `select!` loop.
992    #[instrument(level = "debug")]
993    async fn send_with_cancel<T, F>(
994        &mut self,
995        f: F,
996        cancel_future: impl Future<Output = std::io::Error> + Send,
997    ) -> Result<T, AdapterError>
998    where
999        F: FnOnce(oneshot::Sender<Response<T>>, Session) -> Command,
1000    {
1001        let session = self.session.take().expect("session invariant violated");
1002        let mut typ = None;
1003        let application_name = session.application_name();
1004        let name_hint = ApplicationNameHint::from_str(application_name);
1005        let conn_id = session.conn_id().clone();
1006        let (tx, rx) = oneshot::channel();
1007
1008        // Destructure self so we can hold a mutable reference to the inner client and session at
1009        // the same time.
1010        let Self {
1011            inner: inner_client,
1012            session: client_session,
1013            ..
1014        } = self;
1015
1016        // TODO(parkmycar): Leaking this invariant here doesn't feel great, but calling
1017        // `self.client()` doesn't work because then Rust takes a borrow on the entirity of self.
1018        let inner_client = inner_client.as_ref().expect("inner invariant violated");
1019
1020        // ~~SPOOKY ZONE~~
1021        //
1022        // This guard prevents a race where a `Session` is returned on `rx` but never placed
1023        // back in `self` because the Future returned by this function is concurrently dropped
1024        // with the Coordinator sending a response.
1025        let mut guarded_rx = rx.with_guard(|response: Response<_>| {
1026            *client_session = Some(response.session);
1027        });
1028
1029        inner_client.send({
1030            let cmd = f(tx, session);
1031            // Measure the success and error rate of certain commands:
1032            // - declare reports success of SQL statement planning
1033            // - execute reports success of dataflow execution
1034            match cmd {
1035                Command::Execute { .. } => typ = Some("execute"),
1036                Command::GetWebhook { .. } => typ = Some("webhook"),
1037                Command::Startup { .. }
1038                | Command::AuthenticatePassword { .. }
1039                | Command::AuthenticateGetSASLChallenge { .. }
1040                | Command::AuthenticateVerifySASLProof { .. }
1041                | Command::CatalogSnapshot { .. }
1042                | Command::Commit { .. }
1043                | Command::CancelRequest { .. }
1044                | Command::PrivilegedCancelRequest { .. }
1045                | Command::GetSystemVars { .. }
1046                | Command::SetSystemVars { .. }
1047                | Command::Terminate { .. }
1048                | Command::RetireExecute { .. }
1049                | Command::CheckConsistency { .. }
1050                | Command::Dump { .. }
1051                | Command::GetComputeInstanceClient { .. }
1052                | Command::GetOracle { .. }
1053                | Command::DetermineRealTimeRecentTimestamp { .. }
1054                | Command::GetTransactionReadHoldsBundle { .. }
1055                | Command::StoreTransactionReadHolds { .. }
1056                | Command::ExecuteSlowPathPeek { .. }
1057                | Command::CopyToPreflight { .. }
1058                | Command::ExecuteCopyTo { .. }
1059                | Command::ExecuteSideEffectingFunc { .. }
1060                | Command::RegisterFrontendPeek { .. }
1061                | Command::UnregisterFrontendPeek { .. }
1062                | Command::ExplainTimestamp { .. }
1063                | Command::FrontendStatementLogging(..) => {}
1064            };
1065            cmd
1066        });
1067
1068        let mut cancel_future = pin::pin!(cancel_future);
1069        let mut cancelled = false;
1070        loop {
1071            tokio::select! {
1072                res = &mut guarded_rx => {
1073                    // We received a result, so drop our guard to drop our borrows.
1074                    drop(guarded_rx);
1075
1076                    let res = res.expect("sender dropped");
1077                    let status = res.result.is_ok().then_some("success").unwrap_or("error");
1078                    if let Err(err) = res.result.as_ref() {
1079                        if name_hint.should_trace_errors() {
1080                            tracing::warn!(?err, ?name_hint, "adapter response error");
1081                        }
1082                    }
1083
1084                    if let Some(typ) = typ {
1085                        inner_client
1086                            .metrics
1087                            .commands
1088                            .with_label_values(&[typ, status, name_hint.as_str()])
1089                            .inc();
1090                    }
1091                    *client_session = Some(res.session);
1092                    return res.result;
1093                },
1094                _err = &mut cancel_future, if !cancelled => {
1095                    cancelled = true;
1096                    inner_client.send(Command::PrivilegedCancelRequest {
1097                        conn_id: conn_id.clone(),
1098                    });
1099                }
1100            };
1101        }
1102    }
1103
1104    pub fn add_idle_in_transaction_session_timeout(&mut self) {
1105        let session = self.session();
1106        let timeout_dur = session.vars().idle_in_transaction_session_timeout();
1107        if !timeout_dur.is_zero() {
1108            let timeout_dur = timeout_dur.clone();
1109            if let Some(txn) = session.transaction().inner() {
1110                let txn_id = txn.id.clone();
1111                let timeout = TimeoutType::IdleInTransactionSession(txn_id);
1112                self.timeouts.add_timeout(timeout, timeout_dur);
1113            }
1114        }
1115    }
1116
1117    pub fn remove_idle_in_transaction_session_timeout(&mut self) {
1118        let session = self.session();
1119        if let Some(txn) = session.transaction().inner() {
1120            let txn_id = txn.id.clone();
1121            self.timeouts
1122                .remove_timeout(&TimeoutType::IdleInTransactionSession(txn_id));
1123        }
1124    }
1125
1126    /// # Cancel safety
1127    ///
1128    /// This method is cancel safe. If `recv` is used as the event in a
1129    /// `tokio::select!` statement and some other branch
1130    /// completes first, it is guaranteed that no messages were received on this
1131    /// channel.
1132    pub async fn recv_timeout(&mut self) -> Option<TimeoutType> {
1133        self.timeouts.recv().await
1134    }
1135
1136    /// Returns a reference to the PeekClient used for frontend peek sequencing.
1137    pub fn peek_client(&self) -> &PeekClient {
1138        &self.peek_client
1139    }
1140
1141    /// Returns a reference to the PeekClient used for frontend peek sequencing.
1142    pub fn peek_client_mut(&mut self) -> &mut PeekClient {
1143        &mut self.peek_client
1144    }
1145
1146    /// Attempt to sequence a peek from the session task.
1147    ///
1148    /// Returns `Ok(Some(response))` if we handled the peek, or `Ok(None)` to fall back to the
1149    /// Coordinator's sequencing. If it returns an error, it should be returned to the user.
1150    ///
1151    /// `outer_ctx_extra` is Some when we are executing as part of an outer statement, e.g., a FETCH
1152    /// triggering the execution of the underlying query.
1153    pub(crate) async fn try_frontend_peek(
1154        &mut self,
1155        portal_name: &str,
1156        outer_ctx_extra: &mut Option<ExecuteContextGuard>,
1157    ) -> Result<Option<ExecuteResponse>, AdapterError> {
1158        if self.enable_frontend_peek_sequencing {
1159            let session = self.session.as_mut().expect("SessionClient invariant");
1160            self.peek_client
1161                .try_frontend_peek(portal_name, session, outer_ctx_extra)
1162                .await
1163        } else {
1164            Ok(None)
1165        }
1166    }
1167}
1168
1169impl Drop for SessionClient {
1170    fn drop(&mut self) {
1171        // We may not have a session if this client was dropped while awaiting
1172        // a response. In this case, it is the coordinator's responsibility to
1173        // terminate the session.
1174        if let Some(session) = self.session.take() {
1175            // We may not have a connection to the Coordinator if the session was
1176            // prematurely terminated, for example due to a timeout.
1177            if let Some(inner) = &self.inner {
1178                inner.send(Command::Terminate {
1179                    conn_id: session.conn_id().clone(),
1180                    tx: None,
1181                })
1182            }
1183        }
1184    }
1185}
1186
1187#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug)]
1188pub enum TimeoutType {
1189    IdleInTransactionSession(TransactionId),
1190}
1191
1192impl Display for TimeoutType {
1193    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1194        match self {
1195            TimeoutType::IdleInTransactionSession(txn_id) => {
1196                writeln!(f, "Idle in transaction session for transaction '{txn_id}'")
1197            }
1198        }
1199    }
1200}
1201
1202impl From<TimeoutType> for AdapterError {
1203    fn from(timeout: TimeoutType) -> Self {
1204        match timeout {
1205            TimeoutType::IdleInTransactionSession(_) => {
1206                AdapterError::IdleInTransactionSessionTimeout
1207            }
1208        }
1209    }
1210}
1211
1212struct Timeout {
1213    tx: mpsc::UnboundedSender<TimeoutType>,
1214    rx: mpsc::UnboundedReceiver<TimeoutType>,
1215    active_timeouts: BTreeMap<TimeoutType, AbortOnDropHandle<()>>,
1216}
1217
1218impl Timeout {
1219    fn new() -> Self {
1220        let (tx, rx) = mpsc::unbounded_channel();
1221        Timeout {
1222            tx,
1223            rx,
1224            active_timeouts: BTreeMap::new(),
1225        }
1226    }
1227
1228    /// # Cancel safety
1229    ///
1230    /// This method is cancel safe. If `recv` is used as the event in a
1231    /// `tokio::select!` statement and some other branch
1232    /// completes first, it is guaranteed that no messages were received on this
1233    /// channel.
1234    ///
1235    /// <https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.UnboundedReceiver.html#cancel-safety>
1236    async fn recv(&mut self) -> Option<TimeoutType> {
1237        self.rx.recv().await
1238    }
1239
1240    fn add_timeout(&mut self, timeout: TimeoutType, duration: Duration) {
1241        let tx = self.tx.clone();
1242        let timeout_key = timeout.clone();
1243        let handle = mz_ore::task::spawn(|| format!("{timeout_key}"), async move {
1244            tokio::time::sleep(duration).await;
1245            let _ = tx.send(timeout);
1246        })
1247        .abort_on_drop();
1248        self.active_timeouts.insert(timeout_key, handle);
1249    }
1250
1251    fn remove_timeout(&mut self, timeout: &TimeoutType) {
1252        self.active_timeouts.remove(timeout);
1253
1254        // Remove the timeout from the rx queue if it exists.
1255        let mut timeouts = Vec::new();
1256        while let Ok(pending_timeout) = self.rx.try_recv() {
1257            if timeout != &pending_timeout {
1258                timeouts.push(pending_timeout);
1259            }
1260        }
1261        for pending_timeout in timeouts {
1262            self.tx.send(pending_timeout).expect("rx is in this struct");
1263        }
1264    }
1265}
1266
1267/// A wrapper around a Stream of PeekResponseUnary that records when it sees the
1268/// first row data in the given histogram. It also keeps track of whether we have already observed
1269/// the end of the underlying stream.
1270#[derive(Derivative)]
1271#[derivative(Debug)]
1272pub struct RecordFirstRowStream {
1273    /// The underlying stream of rows.
1274    #[derivative(Debug = "ignore")]
1275    pub rows: Box<dyn Stream<Item = PeekResponseUnary> + Unpin + Send + Sync>,
1276    /// The Instant when execution started.
1277    pub execute_started: Instant,
1278    /// The histogram where the time since `execute_started` will be recorded when we see the first
1279    /// row.
1280    pub time_to_first_row_seconds: Histogram,
1281    /// Whether we've seen any rows.
1282    pub saw_rows: bool,
1283    /// The Instant when we saw the first row.
1284    pub recorded_first_row_instant: Option<Instant>,
1285    /// Whether we have already observed the end of the underlying stream.
1286    pub no_more_rows: bool,
1287}
1288
1289impl RecordFirstRowStream {
1290    /// Create a new [`RecordFirstRowStream`]
1291    pub fn new(
1292        rows: Box<dyn Stream<Item = PeekResponseUnary> + Unpin + Send + Sync>,
1293        execute_started: Instant,
1294        client: &SessionClient,
1295        instance_id: Option<ComputeInstanceId>,
1296        strategy: Option<StatementExecutionStrategy>,
1297    ) -> Self {
1298        let histogram = Self::histogram(client, instance_id, strategy);
1299        Self {
1300            rows,
1301            execute_started,
1302            time_to_first_row_seconds: histogram,
1303            saw_rows: false,
1304            recorded_first_row_instant: None,
1305            no_more_rows: false,
1306        }
1307    }
1308
1309    fn histogram(
1310        client: &SessionClient,
1311        instance_id: Option<ComputeInstanceId>,
1312        strategy: Option<StatementExecutionStrategy>,
1313    ) -> Histogram {
1314        let isolation_level = *client
1315            .session
1316            .as_ref()
1317            .expect("session invariant")
1318            .vars()
1319            .transaction_isolation();
1320        let instance = match instance_id {
1321            Some(i) => Cow::Owned(i.to_string()),
1322            None => Cow::Borrowed("none"),
1323        };
1324        let strategy = match strategy {
1325            Some(s) => s.name(),
1326            None => "none",
1327        };
1328
1329        client
1330            .inner()
1331            .metrics()
1332            .time_to_first_row_seconds
1333            .with_label_values(&[instance.as_ref(), isolation_level.as_str(), strategy])
1334    }
1335
1336    /// If you want to match [`RecordFirstRowStream`]'s logic but don't need
1337    /// a UnboundedReceiver, you can tell it when to record an observation.
1338    pub fn record(
1339        execute_started: Instant,
1340        client: &SessionClient,
1341        instance_id: Option<ComputeInstanceId>,
1342        strategy: Option<StatementExecutionStrategy>,
1343    ) {
1344        Self::histogram(client, instance_id, strategy)
1345            .observe(execute_started.elapsed().as_secs_f64());
1346    }
1347
1348    pub async fn recv(&mut self) -> Option<PeekResponseUnary> {
1349        let msg = self.rows.next().await;
1350        if !self.saw_rows && matches!(msg, Some(PeekResponseUnary::Rows(_))) {
1351            self.saw_rows = true;
1352            self.time_to_first_row_seconds
1353                .observe(self.execute_started.elapsed().as_secs_f64());
1354            self.recorded_first_row_instant = Some(Instant::now());
1355        }
1356        if msg.is_none() {
1357            self.no_more_rows = true;
1358        }
1359        msg
1360    }
1361}