Skip to main content

mz_adapter/
session.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Per-connection configuration parameters and state.
11
12#![warn(missing_docs)]
13
14use std::collections::btree_map::Entry;
15use std::collections::{BTreeMap, BTreeSet};
16use std::future::Future;
17use std::mem;
18use std::net::IpAddr;
19use std::pin::Pin;
20use std::sync::Arc;
21
22use chrono::{DateTime, Utc};
23use derivative::Derivative;
24use itertools::Itertools;
25use mz_adapter_types::connection::ConnectionId;
26use mz_auth::AuthenticatorKind;
27use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
28use mz_controller_types::ClusterId;
29use mz_ore::metrics::{MetricsFutureExt, MetricsRegistry};
30use mz_ore::now::{EpochMillis, NowFn};
31use mz_pgwire_common::Format;
32use mz_repr::role_id::RoleId;
33use mz_repr::user::{ExternalUserMetadata, InternalUserMetadata};
34use mz_repr::{CatalogItemId, Datum, Row, RowIterator, SqlScalarType, Timestamp};
35use mz_sql::ast::{AstInfo, Raw, Statement, TransactionAccessMode};
36use mz_sql::plan::{Params, PlanContext, QueryWhen, StatementDesc};
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::user::{
39    INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER, RoleMetadata, SYSTEM_USER, User,
40};
41use mz_sql::session::vars::IsolationLevel;
42pub use mz_sql::session::vars::{
43    DEFAULT_DATABASE_NAME, EndTransactionAction, SERVER_MAJOR_VERSION, SERVER_MINOR_VERSION,
44    SERVER_PATCH_VERSION, SessionVars, Var,
45};
46use mz_sql_parser::ast::TransactionIsolationLevel;
47use mz_storage_client::client::TableData;
48use mz_storage_types::sources::Timeline;
49use qcell::{QCell, QCellOwner};
50use timely::progress::Timestamp as _;
51use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
52use tokio::sync::watch;
53use uuid::Uuid;
54
55use crate::catalog::CatalogState;
56use crate::client::RecordFirstRowStream;
57use crate::coord::appends::BuiltinTableAppendNotify;
58use crate::coord::in_memory_oracle::InMemoryTimestampOracle;
59use crate::coord::peek::PeekResponseUnary;
60use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
61use crate::coord::{Coordinator, ExplainContext};
62use crate::error::AdapterError;
63use crate::metrics::{Metrics, SessionMetrics};
64use crate::statement_logging::PreparedStatementLoggingInfo;
65use crate::{AdapterNotice, ExecuteContext};
66use mz_catalog::durable::Snapshot;
67
68const DUMMY_CONNECTION_ID: ConnectionId = ConnectionId::Static(0);
69
70/// A session holds per-connection state.
71#[derive(Derivative)]
72#[derivative(Debug)]
73pub struct Session {
74    conn_id: ConnectionId,
75    /// A globally unique identifier for the session. Not to be confused
76    /// with `conn_id`, which may be reused.
77    uuid: Uuid,
78    prepared_statements: BTreeMap<String, PreparedStatement>,
79    portals: BTreeMap<String, Portal>,
80    transaction: TransactionStatus,
81    pcx: Option<PlanContext>,
82    metrics: SessionMetrics,
83    #[derivative(Debug = "ignore")]
84    builtin_updates: Option<BuiltinTableAppendNotify>,
85
86    /// The role metadata of the current session.
87    ///
88    /// Invariant: role_metadata must be `Some` after the user has
89    /// successfully connected to and authenticated with Materialize.
90    ///
91    /// Prefer using this value over [`SessionConfig::user`].
92    //
93    // It would be better for this not to be an Option, but the
94    // `Session` is initialized before the user has connected to
95    // Materialize and is able to look up the `RoleMetadata`. The `Session`
96    // is also used to return an error when no role exists and
97    // therefore there is no valid `RoleMetadata`.
98    role_metadata: Option<RoleMetadata>,
99    client_ip: Option<IpAddr>,
100    vars: SessionVars,
101    notices_tx: mpsc::UnboundedSender<AdapterNotice>,
102    notices_rx: mpsc::UnboundedReceiver<AdapterNotice>,
103    next_transaction_id: TransactionId,
104    secret_key: u32,
105    external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
106    // Token allowing us to access `Arc<QCell<StatementLogging>>`
107    // metadata. We want these to be reference-counted, because the same
108    // statement might be referenced from multiple portals simultaneously.
109    //
110    // However, they can't be `Rc<RefCell<StatementLogging>>`, because
111    // the `Session` is sent around to different threads.
112    //
113    // On the other hand, they don't need to be
114    // `Arc<Mutex<StatementLogging>>`, because they will always be
115    // accessed from the same thread that the `Session` is currently
116    // on. We express this by gating access with this token.
117    #[derivative(Debug = "ignore")]
118    qcell_owner: QCellOwner,
119    session_oracles: BTreeMap<Timeline, InMemoryTimestampOracle>,
120    /// Incremented when session state that is relevant to prepared statement planning changes.
121    /// Currently, only changes to `portals` are tracked. Changes to `prepared_statements` don't
122    /// need to be tracked, because prepared statements can't depend on other prepared statements.
123    /// TODO: We might want to track changes also to session variables.
124    /// (`Catalog::transient_revision` similarly tracks changes on the catalog side.)
125    state_revision: u64,
126}
127
128impl SessionMetadata for Session {
129    fn conn_id(&self) -> &ConnectionId {
130        &self.conn_id
131    }
132
133    fn client_ip(&self) -> Option<&IpAddr> {
134        self.client_ip.as_ref()
135    }
136
137    fn pcx(&self) -> &PlanContext {
138        &self
139            .transaction()
140            .inner()
141            .expect("no active transaction")
142            .pcx
143    }
144
145    fn role_metadata(&self) -> &RoleMetadata {
146        self.role_metadata
147            .as_ref()
148            .expect("role_metadata invariant violated")
149    }
150
151    fn vars(&self) -> &SessionVars {
152        &self.vars
153    }
154}
155
156/// Data structure suitable for passing to other threads that need access to some common Session
157/// properties.
158#[derive(Debug)]
159pub struct SessionMeta {
160    conn_id: ConnectionId,
161    client_ip: Option<IpAddr>,
162    pcx: PlanContext,
163    role_metadata: RoleMetadata,
164    vars: SessionVars,
165}
166
167impl SessionMetadata for SessionMeta {
168    fn vars(&self) -> &SessionVars {
169        &self.vars
170    }
171
172    fn conn_id(&self) -> &ConnectionId {
173        &self.conn_id
174    }
175
176    fn client_ip(&self) -> Option<&IpAddr> {
177        self.client_ip.as_ref()
178    }
179
180    fn pcx(&self) -> &PlanContext {
181        &self.pcx
182    }
183
184    fn role_metadata(&self) -> &RoleMetadata {
185        &self.role_metadata
186    }
187}
188
189/// Configures a new [`Session`].
190#[derive(Debug, Clone)]
191pub struct SessionConfig {
192    /// The connection ID for the session.
193    ///
194    /// May be reused after the session terminates.
195    pub conn_id: ConnectionId,
196    /// A universally unique identifier for the session, across all processes,
197    /// region, and all time.
198    ///
199    /// Must not be reused, even after the session terminates.
200    pub uuid: Uuid,
201    /// The peer address of the client
202    pub client_ip: Option<IpAddr>,
203    /// The name of the user associated with the session.
204    pub user: String,
205    /// An optional receiver that the session will periodically check for
206    /// updates to a user's external metadata.
207    pub external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
208    /// Helm chart version
209    pub helm_chart_version: Option<String>,
210    /// The authenticator that authenticated this user, if any.
211    pub authenticator_kind: AuthenticatorKind,
212}
213
214impl Session {
215    /// Creates a new session for the specified connection ID.
216    pub(crate) fn new(
217        build_info: &'static BuildInfo,
218        config: SessionConfig,
219        metrics: SessionMetrics,
220    ) -> Session {
221        assert_ne!(config.conn_id, DUMMY_CONNECTION_ID);
222        Self::new_internal(build_info, config, metrics)
223    }
224
225    /// Returns a reference-less collection of data usable by other tasks that don't have ownership
226    /// of the Session.
227    pub fn meta(&self) -> SessionMeta {
228        SessionMeta {
229            conn_id: self.conn_id().clone(),
230            client_ip: self.client_ip().copied(),
231            pcx: self.pcx().clone(),
232            role_metadata: self.role_metadata().clone(),
233            vars: self.vars.clone(),
234        }
235
236        // TODO: soft_assert that these are the same as Session.
237    }
238
239    /// Creates new statement logging metadata for a one-off
240    /// statement.
241    // Normally, such logging information would be created as part of
242    // allocating a new prepared statement, and a refcounted handle
243    // would be copied from that prepared statement to portals during
244    // binding. However, we also support (via `Command::declare`)
245    // binding a statement directly to a portal without creating an
246    // intermediate prepared statement. Thus, for those cases, a
247    // mechanism for generating the logging metadata directly is needed.
248    pub(crate) fn mint_logging<A: AstInfo>(
249        &self,
250        raw_sql: String,
251        stmt: Option<&Statement<A>>,
252        now: EpochMillis,
253    ) -> Arc<QCell<PreparedStatementLoggingInfo>> {
254        Arc::new(QCell::new(
255            &self.qcell_owner,
256            PreparedStatementLoggingInfo::still_to_log(
257                raw_sql,
258                stmt,
259                now,
260                "".to_string(),
261                self.uuid,
262                false,
263            ),
264        ))
265    }
266
267    pub(crate) fn qcell_ro<'a, T2: 'a>(&'a self, cell: &'a Arc<QCell<T2>>) -> &'a T2 {
268        self.qcell_owner.ro(&*cell)
269    }
270
271    pub(crate) fn qcell_rw<'a, T2: 'a>(&'a mut self, cell: &'a Arc<QCell<T2>>) -> &'a mut T2 {
272        self.qcell_owner.rw(&*cell)
273    }
274
275    /// Returns a unique ID for the session.
276    /// Not to be confused with `connection_id`, which can be reused.
277    pub fn uuid(&self) -> Uuid {
278        self.uuid
279    }
280
281    /// Creates a new dummy session.
282    ///
283    /// Dummy sessions are intended for use when executing queries on behalf of
284    /// the system itself, rather than on behalf of a user.
285    pub fn dummy() -> Session {
286        let registry = MetricsRegistry::new();
287        let metrics = Metrics::register_into(&registry);
288        let metrics = metrics.session_metrics();
289        let mut dummy = Self::new_internal(
290            &DUMMY_BUILD_INFO,
291            SessionConfig {
292                conn_id: DUMMY_CONNECTION_ID,
293                uuid: Uuid::new_v4(),
294                user: SYSTEM_USER.name.clone(),
295                client_ip: None,
296                external_metadata_rx: None,
297                helm_chart_version: None,
298                authenticator_kind: AuthenticatorKind::None,
299            },
300            metrics,
301        );
302        dummy.initialize_role_metadata(RoleId::User(0));
303        dummy
304    }
305
306    fn new_internal(
307        build_info: &'static BuildInfo,
308        SessionConfig {
309            conn_id,
310            uuid,
311            user,
312            client_ip,
313            mut external_metadata_rx,
314            helm_chart_version,
315            authenticator_kind,
316        }: SessionConfig,
317        metrics: SessionMetrics,
318    ) -> Session {
319        let (notices_tx, notices_rx) = mpsc::unbounded_channel();
320        let default_cluster = INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER.get(&user);
321        let user = User {
322            name: user,
323            internal_metadata: None,
324            external_metadata: external_metadata_rx
325                .as_mut()
326                .map(|rx| rx.borrow_and_update().clone()),
327            authenticator_kind: Some(authenticator_kind),
328        };
329        let mut vars = SessionVars::new_unchecked(build_info, user, helm_chart_version);
330        if let Some(default_cluster) = default_cluster {
331            vars.set_cluster(default_cluster.clone());
332        }
333        Session {
334            conn_id,
335            uuid,
336            transaction: TransactionStatus::Default,
337            pcx: None,
338            metrics,
339            builtin_updates: None,
340            prepared_statements: BTreeMap::new(),
341            portals: BTreeMap::new(),
342            role_metadata: None,
343            client_ip,
344            vars,
345            notices_tx,
346            notices_rx,
347            next_transaction_id: 0,
348            secret_key: rand::random(),
349            external_metadata_rx,
350            qcell_owner: QCellOwner::new(),
351            session_oracles: BTreeMap::new(),
352            state_revision: 0,
353        }
354    }
355
356    /// Returns the secret key associated with the session.
357    pub fn secret_key(&self) -> u32 {
358        self.secret_key
359    }
360
361    fn new_pcx(&self, mut wall_time: DateTime<Utc>) -> PlanContext {
362        if let Some(mock_time) = self.vars().unsafe_new_transaction_wall_time() {
363            wall_time = *mock_time;
364        }
365        PlanContext::new(wall_time)
366    }
367
368    /// Starts an explicit transaction, or changes an implicit to an explicit
369    /// transaction.
370    pub fn start_transaction(
371        &mut self,
372        wall_time: DateTime<Utc>,
373        access: Option<TransactionAccessMode>,
374        isolation_level: Option<TransactionIsolationLevel>,
375    ) -> Result<(), AdapterError> {
376        // Check that current transaction state is compatible with new `access`
377        if let Some(txn) = self.transaction.inner() {
378            // `READ WRITE` prohibited if:
379            // - Currently in `READ ONLY`
380            // - Already performed a query
381            let read_write_prohibited = match txn.ops {
382                TransactionOps::Peeks { .. } | TransactionOps::Subscribe => {
383                    txn.access == Some(TransactionAccessMode::ReadOnly)
384                }
385                TransactionOps::None
386                | TransactionOps::Writes(_)
387                | TransactionOps::SingleStatement { .. }
388                | TransactionOps::DDL { .. } => false,
389            };
390
391            if read_write_prohibited && access == Some(TransactionAccessMode::ReadWrite) {
392                return Err(AdapterError::ReadWriteUnavailable);
393            }
394        }
395
396        match std::mem::take(&mut self.transaction) {
397            TransactionStatus::Default => {
398                let id = self.next_transaction_id;
399                self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
400                self.transaction = TransactionStatus::InTransaction(Transaction {
401                    pcx: self.new_pcx(wall_time),
402                    ops: TransactionOps::None,
403                    write_lock_guards: None,
404                    access,
405                    id,
406                });
407            }
408            TransactionStatus::Started(mut txn)
409            | TransactionStatus::InTransactionImplicit(mut txn)
410            | TransactionStatus::InTransaction(mut txn) => {
411                if access.is_some() {
412                    txn.access = access;
413                }
414                self.transaction = TransactionStatus::InTransaction(txn);
415            }
416            TransactionStatus::Failed(_) => unreachable!(),
417        };
418
419        if let Some(isolation_level) = isolation_level {
420            self.vars
421                .set_local_transaction_isolation(isolation_level.into());
422        }
423
424        Ok(())
425    }
426
427    /// Starts either a single statement or implicit transaction based on the
428    /// number of statements, but only if no transaction has been started already.
429    pub fn start_transaction_implicit(&mut self, wall_time: DateTime<Utc>, stmts: usize) {
430        if let TransactionStatus::Default = self.transaction {
431            let id = self.next_transaction_id;
432            self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
433            let txn = Transaction {
434                pcx: self.new_pcx(wall_time),
435                ops: TransactionOps::None,
436                write_lock_guards: None,
437                access: None,
438                id,
439            };
440            match stmts {
441                1 => self.transaction = TransactionStatus::Started(txn),
442                n if n > 1 => self.transaction = TransactionStatus::InTransactionImplicit(txn),
443                _ => {}
444            }
445        }
446    }
447
448    /// Starts a single statement transaction, but only if no transaction has been started already.
449    pub fn start_transaction_single_stmt(&mut self, wall_time: DateTime<Utc>) {
450        self.start_transaction_implicit(wall_time, 1);
451    }
452
453    /// Clears a transaction, setting its state to Default and destroying all
454    /// portals. Returned are:
455    /// - sinks that were started in this transaction and need to be dropped
456    /// - the cleared transaction so its operations can be handled
457    ///
458    /// The [Postgres protocol docs](https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY) specify:
459    /// > a named portal object lasts till the end of the current transaction
460    /// and
461    /// > An unnamed portal is destroyed at the end of the transaction
462    #[must_use]
463    pub fn clear_transaction(&mut self) -> TransactionStatus {
464        self.portals.clear();
465        self.pcx = None;
466        self.state_revision += 1;
467        mem::take(&mut self.transaction)
468    }
469
470    /// Marks the current transaction as failed.
471    pub fn fail_transaction(mut self) -> Self {
472        match self.transaction {
473            TransactionStatus::Default => unreachable!(),
474            TransactionStatus::Started(txn)
475            | TransactionStatus::InTransactionImplicit(txn)
476            | TransactionStatus::InTransaction(txn) => {
477                self.transaction = TransactionStatus::Failed(txn);
478            }
479            TransactionStatus::Failed(_) => {}
480        };
481        self
482    }
483
484    /// Returns the current transaction status.
485    pub fn transaction(&self) -> &TransactionStatus {
486        &self.transaction
487    }
488
489    /// Returns the current transaction status.
490    pub fn transaction_mut(&mut self) -> &mut TransactionStatus {
491        &mut self.transaction
492    }
493
494    /// Returns the session's transaction code.
495    pub fn transaction_code(&self) -> TransactionCode {
496        self.transaction().into()
497    }
498
499    /// Adds operations to the current transaction. An error is produced if
500    /// they cannot be merged (i.e., a timestamp-dependent read cannot be
501    /// merged to an insert).
502    pub fn add_transaction_ops(&mut self, add_ops: TransactionOps) -> Result<(), AdapterError> {
503        self.transaction.add_ops(add_ops)
504    }
505
506    /// Returns a channel on which to send notices to the session.
507    pub fn retain_notice_transmitter(&self) -> UnboundedSender<AdapterNotice> {
508        self.notices_tx.clone()
509    }
510
511    /// Adds a notice to the session.
512    pub fn add_notice(&self, notice: AdapterNotice) {
513        self.add_notices([notice])
514    }
515
516    /// Adds multiple notices to the session.
517    pub fn add_notices(&self, notices: impl IntoIterator<Item = AdapterNotice>) {
518        for notice in notices {
519            let _ = self.notices_tx.send(notice);
520        }
521    }
522
523    /// Awaits a possible notice.
524    ///
525    /// This method is cancel safe.
526    pub async fn recv_notice(&mut self) -> AdapterNotice {
527        // This method is cancel safe because recv is cancel safe.
528        loop {
529            let notice = self
530                .notices_rx
531                .recv()
532                .await
533                .expect("Session also holds a sender, so recv won't ever return None");
534            match self.notice_filter(notice) {
535                Some(notice) => return notice,
536                None => continue,
537            }
538        }
539    }
540
541    /// Returns a draining iterator over the notices attached to the session.
542    pub fn drain_notices(&mut self) -> Vec<AdapterNotice> {
543        let mut notices = Vec::new();
544        while let Ok(notice) = self.notices_rx.try_recv() {
545            if let Some(notice) = self.notice_filter(notice) {
546                notices.push(notice);
547            }
548        }
549        notices
550    }
551
552    /// Returns Some if the notice should be reported, otherwise None.
553    fn notice_filter(&self, notice: AdapterNotice) -> Option<AdapterNotice> {
554        // Filter out low threshold severity.
555        let minimum_client_severity = self.vars.client_min_messages();
556        let sev = notice.severity();
557        if !minimum_client_severity.should_output_to_client(&sev) {
558            return None;
559        }
560        // Filter out notices for other clusters.
561        if let AdapterNotice::ClusterReplicaStatusChanged { cluster, .. } = &notice {
562            if cluster != self.vars.cluster() {
563                return None;
564            }
565        }
566        Some(notice)
567    }
568
569    /// Sets the transaction ops to `TransactionOps::None`. Must only be used after
570    /// verifying that no transaction anomalies will occur if cleared.
571    pub fn clear_transaction_ops(&mut self) {
572        if let Some(txn) = self.transaction.inner_mut() {
573            txn.ops = TransactionOps::None;
574        }
575    }
576
577    /// If the current transaction ops belong to a read, then sets the
578    /// ops to `None`, returning the old read timestamp context if
579    /// any existed. Must only be used after verifying that no transaction
580    /// anomalies will occur if cleared.
581    pub fn take_transaction_timestamp_context(&mut self) -> Option<TimestampContext> {
582        if let Some(Transaction { ops, .. }) = self.transaction.inner_mut() {
583            if let TransactionOps::Peeks { .. } = ops {
584                let ops = std::mem::take(ops);
585                Some(
586                    ops.timestamp_determination()
587                        .expect("checked above")
588                        .timestamp_context,
589                )
590            } else {
591                None
592            }
593        } else {
594            None
595        }
596    }
597
598    /// Returns the transaction's read timestamp determination, if set.
599    ///
600    /// Returns `None` if there is no active transaction, or if the active
601    /// transaction is not a read transaction.
602    pub fn get_transaction_timestamp_determination(&self) -> Option<TimestampDetermination> {
603        match self.transaction.inner() {
604            Some(Transaction {
605                pcx: _,
606                ops: TransactionOps::Peeks { determination, .. },
607                write_lock_guards: _,
608                access: _,
609                id: _,
610            }) => Some(determination.clone()),
611            _ => None,
612        }
613    }
614
615    /// Whether this session has a timestamp for a read transaction.
616    pub fn contains_read_timestamp(&self) -> bool {
617        matches!(
618            self.transaction.inner(),
619            Some(Transaction {
620                pcx: _,
621                ops: TransactionOps::Peeks {
622                    determination: TimestampDetermination {
623                        timestamp_context: TimestampContext::TimelineTimestamp { .. },
624                        ..
625                    },
626                    ..
627                },
628                write_lock_guards: _,
629                access: _,
630                id: _,
631            })
632        )
633    }
634
635    /// Registers the prepared statement under `name`.
636    pub fn set_prepared_statement(
637        &mut self,
638        name: String,
639        stmt: Option<Statement<Raw>>,
640        raw_sql: String,
641        desc: StatementDesc,
642        state_revision: StateRevision,
643        now: EpochMillis,
644    ) {
645        let logging = PreparedStatementLoggingInfo::still_to_log(
646            raw_sql,
647            stmt.as_ref(),
648            now,
649            name.clone(),
650            self.uuid,
651            false,
652        );
653        let statement = PreparedStatement {
654            stmt,
655            desc,
656            state_revision,
657            logging: Arc::new(QCell::new(&self.qcell_owner, logging)),
658        };
659        self.prepared_statements.insert(name, statement);
660    }
661
662    /// Removes the prepared statement associated with `name`.
663    ///
664    /// Returns whether a statement previously existed.
665    pub fn remove_prepared_statement(&mut self, name: &str) -> bool {
666        self.prepared_statements.remove(name).is_some()
667    }
668
669    /// Removes all prepared statements.
670    pub fn remove_all_prepared_statements(&mut self) {
671        self.prepared_statements.clear();
672    }
673
674    /// Retrieves the prepared statement associated with `name`.
675    ///
676    /// This is unverified and could be incorrect if the underlying catalog has
677    /// changed.
678    pub fn get_prepared_statement_unverified(&self, name: &str) -> Option<&PreparedStatement> {
679        self.prepared_statements.get(name)
680    }
681
682    /// Retrieves the prepared statement associated with `name`.
683    ///
684    /// This is unverified and could be incorrect if the underlying catalog has
685    /// changed.
686    pub fn get_prepared_statement_mut_unverified(
687        &mut self,
688        name: &str,
689    ) -> Option<&mut PreparedStatement> {
690        self.prepared_statements.get_mut(name)
691    }
692
693    /// Returns the prepared statements for the session.
694    pub fn prepared_statements(&self) -> &BTreeMap<String, PreparedStatement> {
695        &self.prepared_statements
696    }
697
698    /// Returns the portals for the session.
699    pub fn portals(&self) -> &BTreeMap<String, Portal> {
700        &self.portals
701    }
702
703    /// Binds the specified portal to the specified prepared statement.
704    ///
705    /// If the prepared statement contains parameters, the values and types of
706    /// those parameters must be provided in `params`. It is the caller's
707    /// responsibility to ensure that the correct number of parameters is
708    /// provided.
709    ///
710    /// The `results_formats` parameter sets the desired format of the results,
711    /// and is stored on the portal.
712    pub fn set_portal(
713        &mut self,
714        portal_name: String,
715        desc: StatementDesc,
716        stmt: Option<Statement<Raw>>,
717        logging: Arc<QCell<PreparedStatementLoggingInfo>>,
718        params: Vec<(Datum, SqlScalarType)>,
719        result_formats: Vec<Format>,
720        state_revision: StateRevision,
721    ) -> Result<(), AdapterError> {
722        // The empty portal can be silently replaced.
723        if !portal_name.is_empty() && self.portals.contains_key(&portal_name) {
724            return Err(AdapterError::DuplicateCursor(portal_name));
725        }
726        self.state_revision += 1;
727        let param_types = desc.param_types.clone();
728        self.portals.insert(
729            portal_name,
730            Portal {
731                stmt: stmt.map(Arc::new),
732                desc,
733                state_revision,
734                parameters: Params {
735                    datums: Row::pack(params.iter().map(|(d, _t)| d)),
736                    execute_types: params.into_iter().map(|(_d, t)| t).collect(),
737                    expected_types: param_types,
738                },
739                result_formats,
740                state: PortalState::NotStarted,
741                logging,
742                lifecycle_timestamps: None,
743            },
744        );
745        Ok(())
746    }
747
748    /// Removes the specified portal.
749    ///
750    /// If there is no such portal, this method does nothing. Returns whether that portal existed.
751    pub fn remove_portal(&mut self, portal_name: &str) -> bool {
752        self.state_revision += 1;
753        self.portals.remove(portal_name).is_some()
754    }
755
756    /// Retrieves a reference to the specified portal.
757    ///
758    /// If there is no such portal, returns `None`.
759    pub fn get_portal_unverified(&self, portal_name: &str) -> Option<&Portal> {
760        self.portals.get(portal_name)
761    }
762
763    /// Retrieves a mutable reference to the specified portal.
764    ///
765    /// If there is no such portal, returns `None`.
766    ///
767    /// Note: When using the returned `PortalRefMut`, there is no need to increment
768    /// `Session::state_revision`, because the portal's meaning is not changed.
769    pub fn get_portal_unverified_mut(&mut self, portal_name: &str) -> Option<PortalRefMut<'_>> {
770        self.portals.get_mut(portal_name).map(|p| PortalRefMut {
771            stmt: &p.stmt,
772            desc: &p.desc,
773            state_revision: &mut p.state_revision,
774            parameters: &mut p.parameters,
775            result_formats: &mut p.result_formats,
776            logging: &mut p.logging,
777            state: &mut p.state,
778            lifecycle_timestamps: &mut p.lifecycle_timestamps,
779        })
780    }
781
782    /// Creates and installs a new portal.
783    pub fn create_new_portal(
784        &mut self,
785        stmt: Option<Statement<Raw>>,
786        logging: Arc<QCell<PreparedStatementLoggingInfo>>,
787        desc: StatementDesc,
788        parameters: Params,
789        result_formats: Vec<Format>,
790        state_revision: StateRevision,
791    ) -> Result<String, AdapterError> {
792        self.state_revision += 1;
793
794        // See: https://github.com/postgres/postgres/blob/84f5c2908dad81e8622b0406beea580e40bb03ac/src/backend/utils/mmgr/portalmem.c#L234
795        for i in 0usize.. {
796            let name = format!("<unnamed portal {}>", i);
797            match self.portals.entry(name.clone()) {
798                Entry::Occupied(_) => continue,
799                Entry::Vacant(entry) => {
800                    entry.insert(Portal {
801                        stmt: stmt.map(Arc::new),
802                        desc,
803                        state_revision,
804                        parameters,
805                        result_formats,
806                        state: PortalState::NotStarted,
807                        logging,
808                        lifecycle_timestamps: None,
809                    });
810                    return Ok(name);
811                }
812            }
813        }
814
815        coord_bail!("unable to create a new portal");
816    }
817
818    /// Resets the session to its initial state. Returns sinks that need to be
819    /// dropped.
820    pub fn reset(&mut self) {
821        let _ = self.clear_transaction();
822        self.prepared_statements.clear();
823        self.vars.reset_all();
824    }
825
826    /// Returns the [application_name] that created this session.
827    ///
828    /// [application_name]: (https://www.postgresql.org/docs/current/runtime-config-logging.html#GUC-APPLICATION-NAME)
829    pub fn application_name(&self) -> &str {
830        self.vars.application_name()
831    }
832
833    /// Returns a reference to the variables in this session.
834    pub fn vars(&self) -> &SessionVars {
835        &self.vars
836    }
837
838    /// Returns a mutable reference to the variables in this session.
839    pub fn vars_mut(&mut self) -> &mut SessionVars {
840        &mut self.vars
841    }
842
843    /// Grants a set of write locks to this session's inner [`Transaction`].
844    ///
845    /// # Panics
846    /// If the inner transaction is idle. See [`TransactionStatus::try_grant_write_locks`].
847    ///
848    pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
849        self.transaction.try_grant_write_locks(guards)
850    }
851
852    /// Drains any external metadata updates and applies the changes from the latest update.
853    pub fn apply_external_metadata_updates(&mut self) {
854        // If no sender is registered then there isn't anything to do.
855        let Some(rx) = &mut self.external_metadata_rx else {
856            return;
857        };
858
859        // If the value hasn't changed then return.
860        if !rx.has_changed().unwrap_or(false) {
861            return;
862        }
863
864        // Update our metadata! Note the short critical section (just a clone) to avoid blocking
865        // the sending side of this watch channel.
866        let metadata = rx.borrow_and_update().clone();
867        self.vars.set_external_user_metadata(metadata);
868    }
869
870    /// Applies the internal user metadata to the session.
871    pub fn apply_internal_user_metadata(&mut self, metadata: InternalUserMetadata) {
872        self.vars.set_internal_user_metadata(metadata);
873    }
874
875    /// Initializes the session's role metadata.
876    pub fn initialize_role_metadata(&mut self, role_id: RoleId) {
877        self.role_metadata = Some(RoleMetadata::new(role_id));
878    }
879
880    /// Ensures that a timestamp oracle exists for `timeline` and returns a mutable reference to
881    /// the timestamp oracle.
882    pub fn ensure_timestamp_oracle(&mut self, timeline: Timeline) -> &mut InMemoryTimestampOracle {
883        self.session_oracles.entry(timeline).or_insert_with(|| {
884            InMemoryTimestampOracle::new(Timestamp::minimum(), NowFn::from(Timestamp::minimum))
885        })
886    }
887
888    /// Ensures that a timestamp oracle exists for reads and writes from/to a local input and
889    /// returns a mutable reference to the timestamp oracle.
890    pub fn ensure_local_timestamp_oracle(&mut self) -> &mut InMemoryTimestampOracle {
891        self.ensure_timestamp_oracle(Timeline::EpochMilliseconds)
892    }
893
894    /// Returns a reference to the timestamp oracle for `timeline`.
895    pub fn get_timestamp_oracle(&self, timeline: &Timeline) -> Option<&InMemoryTimestampOracle> {
896        self.session_oracles.get(timeline)
897    }
898
899    /// If the current session is using the Strong Session Serializable isolation level advance the
900    /// session local timestamp oracle to `write_ts`.
901    pub fn apply_write(&mut self, timestamp: Timestamp) {
902        if self.vars().transaction_isolation() == &IsolationLevel::StrongSessionSerializable {
903            self.ensure_local_timestamp_oracle().apply_write(timestamp);
904        }
905    }
906
907    /// Returns the [`SessionMetrics`] instance associated with this [`Session`].
908    pub fn metrics(&self) -> &SessionMetrics {
909        &self.metrics
910    }
911
912    /// Sets the `BuiltinTableAppendNotify` for this session.
913    pub fn set_builtin_table_updates(&mut self, fut: BuiltinTableAppendNotify) {
914        let prev = self.builtin_updates.replace(fut);
915        mz_ore::soft_assert_or_log!(prev.is_none(), "replacing old builtin table notify");
916    }
917
918    /// Takes the stashed `BuiltinTableAppendNotify`, if one exists, and returns a [`Future`] that
919    /// waits for the writes to complete.
920    pub fn clear_builtin_table_updates(&mut self) -> Option<impl Future<Output = ()> + 'static> {
921        if let Some(fut) = self.builtin_updates.take() {
922            // Record how long we blocked for, if we blocked at all.
923            let histogram = self
924                .metrics()
925                .session_startup_table_writes_seconds()
926                .clone();
927            Some(async move {
928                fut.wall_time().observe(histogram).await;
929            })
930        } else {
931            None
932        }
933    }
934
935    /// Return the state_revision of the session, which can be used by dependent objects for knowing
936    /// when to re-plan due to session state changes.
937    pub fn state_revision(&self) -> u64 {
938        self.state_revision
939    }
940}
941
942/// A prepared statement.
943#[derive(Derivative, Clone)]
944#[derivative(Debug)]
945pub struct PreparedStatement {
946    stmt: Option<Statement<Raw>>,
947    desc: StatementDesc,
948    /// The most recent state revision that has verified this statement.
949    pub state_revision: StateRevision,
950    #[derivative(Debug = "ignore")]
951    logging: Arc<QCell<PreparedStatementLoggingInfo>>,
952}
953
954impl PreparedStatement {
955    /// Returns the AST associated with this prepared statement,
956    /// if the prepared statement was not the empty query.
957    pub fn stmt(&self) -> Option<&Statement<Raw>> {
958        self.stmt.as_ref()
959    }
960
961    /// Returns the description of the prepared statement.
962    pub fn desc(&self) -> &StatementDesc {
963        &self.desc
964    }
965
966    /// Returns a handle to the metadata for statement logging.
967    pub fn logging(&self) -> &Arc<QCell<PreparedStatementLoggingInfo>> {
968        &self.logging
969    }
970}
971
972/// A portal represents the execution state of a running or runnable query.
973#[derive(Derivative)]
974#[derivative(Debug)]
975pub struct Portal {
976    /// The statement that is bound to this portal.
977    pub stmt: Option<Arc<Statement<Raw>>>,
978    /// The statement description.
979    pub desc: StatementDesc,
980    /// The most recent state revision that has verified this portal.
981    pub state_revision: StateRevision,
982    /// The bound values for the parameters in the prepared statement, if any.
983    pub parameters: Params,
984    /// The desired output format for each column in the result set.
985    pub result_formats: Vec<Format>,
986    /// A handle to metadata needed for statement logging.
987    #[derivative(Debug = "ignore")]
988    pub logging: Arc<QCell<PreparedStatementLoggingInfo>>,
989    /// The execution state of the portal.
990    #[derivative(Debug = "ignore")]
991    pub state: PortalState,
992    /// Statement lifecycle timestamps coming from `mz-pgwire`.
993    pub lifecycle_timestamps: Option<LifecycleTimestamps>,
994}
995
996/// A mutable reference to a portal, capturing its state and associated metadata. Importantly, it
997/// does _not_ give _mutable_ access to `stmt` and `desc`, which means that you do not need to
998/// increment `Session::state_revision` when modifying fields through a `PortalRefMut`, because the
999/// portal's meaning is not changed.
1000pub struct PortalRefMut<'a> {
1001    /// The statement that is bound to this portal.
1002    pub stmt: &'a Option<Arc<Statement<Raw>>>,
1003    /// The statement description.
1004    pub desc: &'a StatementDesc,
1005    /// The most recent state revision that has verified this portal.
1006    pub state_revision: &'a mut StateRevision,
1007    /// The bound values for the parameters in the prepared statement, if any.
1008    pub parameters: &'a mut Params,
1009    /// The desired output format for each column in the result set.
1010    pub result_formats: &'a mut Vec<Format>,
1011    /// A handle to metadata needed for statement logging.
1012    pub logging: &'a mut Arc<QCell<PreparedStatementLoggingInfo>>,
1013    /// The execution state of the portal.
1014    pub state: &'a mut PortalState,
1015    /// Statement lifecycle timestamps coming from `mz-pgwire`.
1016    pub lifecycle_timestamps: &'a mut Option<LifecycleTimestamps>,
1017}
1018
1019/// Points to a revision of catalog state and session state. When the current revisions are not the
1020/// same as the revisions when a prepared statement or a portal was described, we need to check
1021/// whether the description is still valid.
1022#[derive(Debug, Clone, Copy, PartialEq)]
1023pub struct StateRevision {
1024    /// A revision of the catalog.
1025    pub catalog_revision: u64,
1026    /// A revision of the session state.
1027    pub session_state_revision: u64,
1028}
1029
1030/// Execution states of a portal.
1031pub enum PortalState {
1032    /// Portal not yet started.
1033    NotStarted,
1034    /// Portal is a rows-returning statement in progress with 0 or more rows
1035    /// remaining.
1036    InProgress(Option<InProgressRows>),
1037    /// Portal has completed and should not be re-executed. If the optional string
1038    /// is present, it is returned as a CommandComplete tag, otherwise an error
1039    /// is sent.
1040    Completed(Option<String>),
1041}
1042
1043/// State of an in-progress, rows-returning portal.
1044pub struct InProgressRows {
1045    /// The current batch of rows.
1046    pub current: Option<Box<dyn RowIterator + Send + Sync>>,
1047    /// A stream from which to fetch more row batches.
1048    pub remaining: RecordFirstRowStream,
1049}
1050
1051impl InProgressRows {
1052    /// Creates a new InProgressRows from a batch stream.
1053    pub fn new(remaining: RecordFirstRowStream) -> Self {
1054        Self {
1055            current: None,
1056            remaining,
1057        }
1058    }
1059
1060    /// Determines whether the underlying stream has ended and there are also no more rows
1061    /// stashed in `current`.
1062    pub fn no_more_rows(&self) -> bool {
1063        self.remaining.no_more_rows && self.current.is_none()
1064    }
1065}
1066
1067/// A channel of batched rows.
1068pub type RowBatchStream = UnboundedReceiver<PeekResponseUnary>;
1069
1070/// Part of statement lifecycle. These are timestamps that come from the Adapter frontend
1071/// (`mz-pgwire`) part of the lifecycle.
1072#[derive(Debug, Clone)]
1073pub struct LifecycleTimestamps {
1074    /// When the query was received. More specifically, when the tokio recv returned.
1075    /// For a Simple Query, this is for the whole query, for the Extended Query flow, this is only
1076    /// for `FrontendMessage::Execute`. (This means that this is after parsing for the
1077    /// Extended Query flow.)
1078    pub received: EpochMillis,
1079}
1080
1081impl LifecycleTimestamps {
1082    /// Creates a new `LifecycleTimestamps`.
1083    pub fn new(received: EpochMillis) -> Self {
1084        Self { received }
1085    }
1086}
1087
1088/// The transaction status of a session.
1089///
1090/// PostgreSQL's transaction states are in backend/access/transam/xact.c.
1091#[derive(Debug)]
1092pub enum TransactionStatus {
1093    /// Idle. Matches `TBLOCK_DEFAULT`.
1094    Default,
1095    /// Running a single-query transaction. Matches
1096    /// `TBLOCK_STARTED`. In PostgreSQL, when using the extended query protocol, this
1097    /// may be upgraded into multi-statement implicit query (see [`Self::InTransactionImplicit`]).
1098    /// Additionally, some statements may trigger an eager commit of the implicit transaction,
1099    /// see: <https://git.postgresql.org/gitweb/?p=postgresql.git&a=commitdiff&h=f92944137>. In
1100    /// Materialize however, we eagerly commit all statements outside of an explicit transaction
1101    /// when using the extended query protocol. Therefore, we can guarantee that this state will
1102    /// always be a single-query transaction and never be upgraded into a multi-statement implicit
1103    /// query.
1104    Started(Transaction),
1105    /// Currently in a transaction issued from a `BEGIN`. Matches `TBLOCK_INPROGRESS`.
1106    InTransaction(Transaction),
1107    /// Currently in an implicit transaction started from a multi-statement query
1108    /// with more than 1 statements. Matches `TBLOCK_IMPLICIT_INPROGRESS`.
1109    InTransactionImplicit(Transaction),
1110    /// In a failed transaction. Matches `TBLOCK_ABORT`.
1111    Failed(Transaction),
1112}
1113
1114impl TransactionStatus {
1115    /// Extracts the inner transaction ops and write lock guard if not failed.
1116    pub fn into_ops_and_lock_guard(self) -> (Option<TransactionOps>, Option<WriteLocks>) {
1117        match self {
1118            TransactionStatus::Default | TransactionStatus::Failed(_) => (None, None),
1119            TransactionStatus::Started(txn)
1120            | TransactionStatus::InTransaction(txn)
1121            | TransactionStatus::InTransactionImplicit(txn) => {
1122                (Some(txn.ops), txn.write_lock_guards)
1123            }
1124        }
1125    }
1126
1127    /// Exposes the inner transaction.
1128    pub fn inner(&self) -> Option<&Transaction> {
1129        match self {
1130            TransactionStatus::Default => None,
1131            TransactionStatus::Started(txn)
1132            | TransactionStatus::InTransaction(txn)
1133            | TransactionStatus::InTransactionImplicit(txn)
1134            | TransactionStatus::Failed(txn) => Some(txn),
1135        }
1136    }
1137
1138    /// Exposes the inner transaction.
1139    pub fn inner_mut(&mut self) -> Option<&mut Transaction> {
1140        match self {
1141            TransactionStatus::Default => None,
1142            TransactionStatus::Started(txn)
1143            | TransactionStatus::InTransaction(txn)
1144            | TransactionStatus::InTransactionImplicit(txn)
1145            | TransactionStatus::Failed(txn) => Some(txn),
1146        }
1147    }
1148
1149    /// Whether the transaction's ops are DDL.
1150    pub fn is_ddl(&self) -> bool {
1151        match self {
1152            TransactionStatus::Default => false,
1153            TransactionStatus::Started(txn)
1154            | TransactionStatus::InTransaction(txn)
1155            | TransactionStatus::InTransactionImplicit(txn)
1156            | TransactionStatus::Failed(txn) => {
1157                matches!(txn.ops, TransactionOps::DDL { .. })
1158            }
1159        }
1160    }
1161
1162    /// Expresses whether or not the transaction was implicitly started.
1163    /// However, its negation does not imply explicitly started.
1164    pub fn is_implicit(&self) -> bool {
1165        match self {
1166            TransactionStatus::Started(_) | TransactionStatus::InTransactionImplicit(_) => true,
1167            TransactionStatus::Default
1168            | TransactionStatus::InTransaction(_)
1169            | TransactionStatus::Failed(_) => false,
1170        }
1171    }
1172
1173    /// Whether the transaction may contain multiple statements.
1174    pub fn is_in_multi_statement_transaction(&self) -> bool {
1175        match self {
1176            TransactionStatus::InTransaction(_) | TransactionStatus::InTransactionImplicit(_) => {
1177                true
1178            }
1179            TransactionStatus::Default
1180            | TransactionStatus::Started(_)
1181            | TransactionStatus::Failed(_) => false,
1182        }
1183    }
1184
1185    /// Whether we are in a multi-statement transaction, AND the query is immediate.
1186    pub fn in_immediate_multi_stmt_txn(&self, when: &QueryWhen) -> bool {
1187        self.is_in_multi_statement_transaction() && when == &QueryWhen::Immediately
1188    }
1189
1190    /// Grants the writes lock to the inner transaction, returning an error if the transaction
1191    /// has already been granted write locks.
1192    ///
1193    /// # Panics
1194    /// If `self` is `TransactionStatus::Default`, which indicates that the
1195    /// transaction is idle, which is not appropriate to assign the
1196    /// coordinator's write lock to.
1197    ///
1198    pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1199        match self {
1200            TransactionStatus::Default => panic!("cannot grant write lock to txn not yet started"),
1201            TransactionStatus::Started(txn)
1202            | TransactionStatus::InTransaction(txn)
1203            | TransactionStatus::InTransactionImplicit(txn)
1204            | TransactionStatus::Failed(txn) => txn.try_grant_write_locks(guards),
1205        }
1206    }
1207
1208    /// Returns the currently held [`WriteLocks`], if this transaction holds any.
1209    pub fn write_locks(&self) -> Option<&WriteLocks> {
1210        match self {
1211            TransactionStatus::Default => None,
1212            TransactionStatus::Started(txn)
1213            | TransactionStatus::InTransaction(txn)
1214            | TransactionStatus::InTransactionImplicit(txn)
1215            | TransactionStatus::Failed(txn) => txn.write_lock_guards.as_ref(),
1216        }
1217    }
1218
1219    /// The timeline of the transaction, if one exists.
1220    pub fn timeline(&self) -> Option<Timeline> {
1221        match self {
1222            TransactionStatus::Default => None,
1223            TransactionStatus::Started(txn)
1224            | TransactionStatus::InTransaction(txn)
1225            | TransactionStatus::InTransactionImplicit(txn)
1226            | TransactionStatus::Failed(txn) => txn.timeline(),
1227        }
1228    }
1229
1230    /// The cluster of the transaction, if one exists.
1231    pub fn cluster(&self) -> Option<ClusterId> {
1232        match self {
1233            TransactionStatus::Default => None,
1234            TransactionStatus::Started(txn)
1235            | TransactionStatus::InTransaction(txn)
1236            | TransactionStatus::InTransactionImplicit(txn)
1237            | TransactionStatus::Failed(txn) => txn.cluster(),
1238        }
1239    }
1240
1241    /// Snapshot of the catalog that reflects DDL operations run in this transaction.
1242    pub fn catalog_state(&self) -> Option<&CatalogState> {
1243        match self.inner() {
1244            Some(Transaction {
1245                ops: TransactionOps::DDL { state, .. },
1246                ..
1247            }) => Some(state),
1248            _ => None,
1249        }
1250    }
1251
1252    /// Reports whether any operations have been executed as part of this transaction
1253    pub fn contains_ops(&self) -> bool {
1254        match self.inner() {
1255            Some(txn) => txn.contains_ops(),
1256            None => false,
1257        }
1258    }
1259
1260    /// Checks whether the current state of this transaction allows writes
1261    /// (adding write ops).
1262    /// transaction
1263    pub fn allows_writes(&self) -> bool {
1264        match self {
1265            TransactionStatus::Started(Transaction { ops, access, .. })
1266            | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1267            | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1268                match ops {
1269                    TransactionOps::None => access != &Some(TransactionAccessMode::ReadOnly),
1270                    TransactionOps::Peeks { determination, .. } => {
1271                        // If-and-only-if peeks thus far do not have a timestamp
1272                        // (i.e. they are constant), we can switch to a write
1273                        // transaction.
1274                        !determination.timestamp_context.contains_timestamp()
1275                    }
1276                    TransactionOps::Subscribe => false,
1277                    TransactionOps::Writes(_) => true,
1278                    TransactionOps::SingleStatement { .. } => false,
1279                    TransactionOps::DDL { .. } => false,
1280                }
1281            }
1282            TransactionStatus::Default | TransactionStatus::Failed(_) => {
1283                unreachable!()
1284            }
1285        }
1286    }
1287
1288    /// Adds operations to the current transaction. An error is produced if they cannot be merged
1289    /// (i.e., a timestamp-dependent read cannot be merged to an insert).
1290    ///
1291    /// The `DDL` variant is an exception and does not merge operations, but instead overwrites the
1292    /// old ops with the new ops. This is correct because it is only used in conjunction with the
1293    /// Dry Run catalog op which returns an error containing all of the ops, and those ops are
1294    /// passed to this function which then overwrites.
1295    ///
1296    /// # Panics
1297    /// If the operations are compatible but the operation metadata doesn't match. Such as reads at
1298    /// different timestamps, reads on different timelines, reads on different clusters, etc. It's
1299    /// up to the caller to make sure these are aligned.
1300    pub fn add_ops(&mut self, add_ops: TransactionOps) -> Result<(), AdapterError> {
1301        match self {
1302            TransactionStatus::Started(Transaction { ops, access, .. })
1303            | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1304            | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1305                match ops {
1306                    TransactionOps::None => {
1307                        if matches!(access, Some(TransactionAccessMode::ReadOnly))
1308                            && matches!(add_ops, TransactionOps::Writes(_))
1309                        {
1310                            return Err(AdapterError::ReadOnlyTransaction);
1311                        }
1312                        *ops = add_ops;
1313                    }
1314                    TransactionOps::Peeks {
1315                        determination,
1316                        cluster_id,
1317                        requires_linearization,
1318                    } => match add_ops {
1319                        TransactionOps::Peeks {
1320                            determination: add_timestamp_determination,
1321                            cluster_id: add_cluster_id,
1322                            requires_linearization: add_requires_linearization,
1323                        } => {
1324                            assert_eq!(*cluster_id, add_cluster_id);
1325                            match (
1326                                &determination.timestamp_context,
1327                                &add_timestamp_determination.timestamp_context,
1328                            ) {
1329                                (
1330                                    TimestampContext::TimelineTimestamp {
1331                                        timeline: txn_timeline,
1332                                        chosen_ts: txn_ts,
1333                                        oracle_ts: _,
1334                                    },
1335                                    TimestampContext::TimelineTimestamp {
1336                                        timeline: add_timeline,
1337                                        chosen_ts: add_ts,
1338                                        oracle_ts: _,
1339                                    },
1340                                ) => {
1341                                    assert_eq!(txn_timeline, add_timeline);
1342                                    assert_eq!(txn_ts, add_ts);
1343                                }
1344                                (TimestampContext::NoTimestamp, _) => {
1345                                    *determination = add_timestamp_determination
1346                                }
1347                                (_, TimestampContext::NoTimestamp) => {}
1348                            };
1349                            if matches!(requires_linearization, RequireLinearization::NotRequired)
1350                                && matches!(
1351                                    add_requires_linearization,
1352                                    RequireLinearization::Required
1353                                )
1354                            {
1355                                *requires_linearization = add_requires_linearization;
1356                            }
1357                        }
1358                        // Iff peeks thus far do not have a timestamp (i.e.
1359                        // they are constant), we can switch to a write
1360                        // transaction.
1361                        writes @ TransactionOps::Writes(..)
1362                            if !determination.timestamp_context.contains_timestamp() =>
1363                        {
1364                            *ops = writes;
1365                        }
1366                        _ => return Err(AdapterError::ReadOnlyTransaction),
1367                    },
1368                    TransactionOps::Subscribe => {
1369                        return Err(AdapterError::SubscribeOnlyTransaction);
1370                    }
1371                    TransactionOps::Writes(txn_writes) => match add_ops {
1372                        TransactionOps::Writes(mut add_writes) => {
1373                            // We should have already checked the access above, but make sure we don't miss
1374                            // it anyway.
1375                            assert!(!matches!(access, Some(TransactionAccessMode::ReadOnly)));
1376                            txn_writes.append(&mut add_writes);
1377                        }
1378                        // Iff peeks do not have a timestamp (i.e. they are
1379                        // constant), we can permit them.
1380                        TransactionOps::Peeks { determination, .. }
1381                            if !determination.timestamp_context.contains_timestamp() => {}
1382                        _ => {
1383                            return Err(AdapterError::WriteOnlyTransaction);
1384                        }
1385                    },
1386                    TransactionOps::SingleStatement { .. } => {
1387                        return Err(AdapterError::SingleStatementTransaction);
1388                    }
1389                    TransactionOps::DDL {
1390                        ops: og_ops,
1391                        revision: og_revision,
1392                        state: og_state,
1393                        side_effects,
1394                        snapshot: og_snapshot,
1395                    } => match add_ops {
1396                        TransactionOps::DDL {
1397                            ops: new_ops,
1398                            revision: new_revision,
1399                            side_effects: mut net_new_side_effects,
1400                            state: new_state,
1401                            snapshot: new_snapshot,
1402                        } => {
1403                            if *og_revision != new_revision {
1404                                return Err(AdapterError::DDLTransactionRace);
1405                            }
1406                            // The old og_ops are overwritten, not extended.
1407                            if !new_ops.is_empty() {
1408                                *og_ops = new_ops;
1409                                *og_state = new_state;
1410                                *og_snapshot = new_snapshot;
1411                            }
1412                            side_effects.append(&mut net_new_side_effects);
1413                        }
1414                        _ => return Err(AdapterError::DDLOnlyTransaction),
1415                    },
1416                }
1417            }
1418            TransactionStatus::Default | TransactionStatus::Failed(_) => {
1419                unreachable!()
1420            }
1421        }
1422        Ok(())
1423    }
1424}
1425
1426/// An abstraction allowing us to identify different transactions.
1427pub type TransactionId = u64;
1428
1429impl Default for TransactionStatus {
1430    fn default() -> Self {
1431        TransactionStatus::Default
1432    }
1433}
1434
1435/// State data for transactions.
1436#[derive(Debug)]
1437pub struct Transaction {
1438    /// Plan context.
1439    pub pcx: PlanContext,
1440    /// Transaction operations.
1441    pub ops: TransactionOps,
1442    /// Uniquely identifies the transaction on a per connection basis.
1443    /// Two transactions started from separate connections may share the
1444    /// same ID.
1445    /// If all IDs have been exhausted, this will wrap around back to 0.
1446    pub id: TransactionId,
1447    /// Locks for objects this transaction will operate on.
1448    write_lock_guards: Option<WriteLocks>,
1449    /// Access mode (read only, read write).
1450    access: Option<TransactionAccessMode>,
1451}
1452
1453impl Transaction {
1454    /// Tries to grant the write lock to this transaction for the remainder of its lifetime. Errors
1455    /// if this [`Transaction`] has already been granted write locks.
1456    fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1457        match &mut self.write_lock_guards {
1458            Some(existing) => Err(existing),
1459            locks @ None => {
1460                *locks = Some(guards);
1461                Ok(())
1462            }
1463        }
1464    }
1465
1466    /// The timeline of the transaction, if one exists.
1467    fn timeline(&self) -> Option<Timeline> {
1468        match &self.ops {
1469            TransactionOps::Peeks {
1470                determination:
1471                    TimestampDetermination {
1472                        timestamp_context: TimestampContext::TimelineTimestamp { timeline, .. },
1473                        ..
1474                    },
1475                ..
1476            } => Some(timeline.clone()),
1477            TransactionOps::Peeks { .. }
1478            | TransactionOps::None
1479            | TransactionOps::Subscribe
1480            | TransactionOps::Writes(_)
1481            | TransactionOps::SingleStatement { .. }
1482            | TransactionOps::DDL { .. } => None,
1483        }
1484    }
1485
1486    /// The cluster of the transaction, if one exists.
1487    pub fn cluster(&self) -> Option<ClusterId> {
1488        match &self.ops {
1489            TransactionOps::Peeks { cluster_id, .. } => Some(cluster_id.clone()),
1490            TransactionOps::None
1491            | TransactionOps::Subscribe
1492            | TransactionOps::Writes(_)
1493            | TransactionOps::SingleStatement { .. }
1494            | TransactionOps::DDL { .. } => None,
1495        }
1496    }
1497
1498    /// Reports whether any operations have been executed as part of this transaction
1499    fn contains_ops(&self) -> bool {
1500        !matches!(self.ops, TransactionOps::None)
1501    }
1502}
1503
1504/// A transaction's status code.
1505#[derive(Debug, Clone, Copy)]
1506pub enum TransactionCode {
1507    /// Not currently in a transaction
1508    Idle,
1509    /// Currently in a transaction
1510    InTransaction,
1511    /// Currently in a transaction block which is failed
1512    Failed,
1513}
1514
1515impl From<TransactionCode> for u8 {
1516    fn from(code: TransactionCode) -> Self {
1517        match code {
1518            TransactionCode::Idle => b'I',
1519            TransactionCode::InTransaction => b'T',
1520            TransactionCode::Failed => b'E',
1521        }
1522    }
1523}
1524
1525impl From<TransactionCode> for String {
1526    fn from(code: TransactionCode) -> Self {
1527        char::from(u8::from(code)).to_string()
1528    }
1529}
1530
1531impl From<&TransactionStatus> for TransactionCode {
1532    /// Convert from the Session's version
1533    fn from(status: &TransactionStatus) -> TransactionCode {
1534        match status {
1535            TransactionStatus::Default => TransactionCode::Idle,
1536            TransactionStatus::Started(_) => TransactionCode::InTransaction,
1537            TransactionStatus::InTransaction(_) => TransactionCode::InTransaction,
1538            TransactionStatus::InTransactionImplicit(_) => TransactionCode::InTransaction,
1539            TransactionStatus::Failed(_) => TransactionCode::Failed,
1540        }
1541    }
1542}
1543
1544/// The type of operation being performed by the transaction.
1545///
1546/// This is needed because we currently do not allow mixing reads and writes in
1547/// a transaction. Use this to record what we have done, and what may need to
1548/// happen at commit.
1549#[derive(Derivative)]
1550#[derivative(Debug)]
1551pub enum TransactionOps {
1552    /// The transaction has been initiated, but no statement has yet been executed
1553    /// in it.
1554    None,
1555    /// This transaction has had a peek (`SELECT`, `SUBSCRIBE`). If the inner value
1556    /// is has a timestamp, it must only do other peeks. However, if it doesn't
1557    /// have a timestamp (i.e. the values are constants), the transaction can still
1558    /// perform writes.
1559    Peeks {
1560        /// The timestamp and timestamp related metadata for the peek.
1561        determination: TimestampDetermination,
1562        /// The cluster used to execute peeks.
1563        cluster_id: ClusterId,
1564        /// Whether this peek needs to be linearized.
1565        requires_linearization: RequireLinearization,
1566    },
1567    /// This transaction has done a `SUBSCRIBE` and must do nothing else.
1568    Subscribe,
1569    /// This transaction has had a write (`INSERT`, `UPDATE`, `DELETE`) and must
1570    /// only do other writes, or reads whose timestamp is None (i.e. constants).
1571    Writes(Vec<WriteOp>),
1572    /// This transaction has a prospective statement that will execute during commit.
1573    SingleStatement {
1574        /// The prospective statement.
1575        stmt: Arc<Statement<Raw>>,
1576        /// The statement params.
1577        params: mz_sql::plan::Params,
1578    },
1579    /// This transaction has run some _simple_ DDL and must do nothing else. Any statement/plan that
1580    /// uses this must return false in `must_serialize_ddl()` because this is serialized instead in
1581    /// `sequence_plan()` during `COMMIT`.
1582    DDL {
1583        /// Catalog operations that have already run, and must run before each subsequent op.
1584        ops: Vec<crate::catalog::Op>,
1585        /// In-memory state that reflects the previously applied ops.
1586        state: CatalogState,
1587        /// A list of side effects that should be executed if this DDL transaction commits.
1588        #[derivative(Debug = "ignore")]
1589        side_effects: Vec<
1590            Box<
1591                dyn for<'a> FnOnce(
1592                        &'a mut Coordinator,
1593                        Option<&'a mut ExecuteContext>,
1594                    ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
1595                    + Send
1596                    + Sync,
1597            >,
1598        >,
1599        /// Transient revision of the `Catalog` when this transaction started.
1600        revision: u64,
1601        /// Snapshot of the durable transaction state after the last dry run.
1602        /// Used to initialize the next dry run's transaction so it starts
1603        /// in sync with the accumulated `state`. `None` for the first
1604        /// statement in the transaction (before any dry run).
1605        snapshot: Option<Snapshot>,
1606    },
1607}
1608
1609impl TransactionOps {
1610    fn timestamp_determination(self) -> Option<TimestampDetermination> {
1611        match self {
1612            TransactionOps::Peeks { determination, .. } => Some(determination),
1613            TransactionOps::None
1614            | TransactionOps::Subscribe
1615            | TransactionOps::Writes(_)
1616            | TransactionOps::SingleStatement { .. }
1617            | TransactionOps::DDL { .. } => None,
1618        }
1619    }
1620}
1621
1622impl Default for TransactionOps {
1623    fn default() -> Self {
1624        Self::None
1625    }
1626}
1627
1628/// An `INSERT` waiting to be committed.
1629#[derive(Debug, Clone, PartialEq)]
1630pub struct WriteOp {
1631    /// The target table.
1632    pub id: CatalogItemId,
1633    /// The data rows.
1634    pub rows: TableData,
1635}
1636
1637/// Whether a transaction requires linearization.
1638#[derive(Debug)]
1639pub enum RequireLinearization {
1640    /// Linearization is required.
1641    Required,
1642    /// Linearization is not required.
1643    NotRequired,
1644}
1645
1646impl From<&ExplainContext> for RequireLinearization {
1647    fn from(ctx: &ExplainContext) -> Self {
1648        match ctx {
1649            ExplainContext::None | ExplainContext::PlanInsightsNotice(_) => {
1650                RequireLinearization::Required
1651            }
1652            _ => RequireLinearization::NotRequired,
1653        }
1654    }
1655}
1656
1657/// A complete set of exclusive locks for writing to collections identified by [`CatalogItemId`]s.
1658///
1659/// To prevent deadlocks between two sessions, we do not allow acquiring a partial set of locks.
1660#[derive(Debug)]
1661pub struct WriteLocks {
1662    locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1663    /// Connection that currently holds these locks, used for tracing purposes only.
1664    conn_id: ConnectionId,
1665}
1666
1667impl WriteLocks {
1668    /// Create a [`WriteLocksBuilder`] pre-defining all of the locks we need.
1669    ///
1670    /// When "finishing" the builder with [`WriteLocksBuilder::all_or_nothing`], if we haven't
1671    /// acquired all of the necessary locks we drop any partially acquired ones.
1672    pub fn builder(sources: impl IntoIterator<Item = CatalogItemId>) -> WriteLocksBuilder {
1673        let locks = sources.into_iter().map(|gid| (gid, None)).collect();
1674        WriteLocksBuilder { locks }
1675    }
1676
1677    /// Validate this set of [`WriteLocks`] is sufficient for the provided collections.
1678    /// Dropping the currently held locks if it's not.
1679    pub fn validate(
1680        self,
1681        collections: impl Iterator<Item = CatalogItemId>,
1682    ) -> Result<Self, BTreeSet<CatalogItemId>> {
1683        let mut missing = BTreeSet::new();
1684        for collection in collections {
1685            if !self.locks.contains_key(&collection) {
1686                missing.insert(collection);
1687            }
1688        }
1689
1690        if missing.is_empty() {
1691            Ok(self)
1692        } else {
1693            // Explicitly drop the already acquired locks.
1694            drop(self);
1695            Err(missing)
1696        }
1697    }
1698}
1699
1700impl Drop for WriteLocks {
1701    fn drop(&mut self) {
1702        // We may have merged the locks into GroupCommitWriteLocks, thus it could be empty.
1703        if !self.locks.is_empty() {
1704            tracing::info!(
1705                conn_id = %self.conn_id,
1706                locks = ?self.locks,
1707                "dropping write locks",
1708            );
1709        }
1710    }
1711}
1712
1713/// A builder struct that helps us acquire all of the locks we need, or none of them.
1714///
1715/// See [`WriteLocks::builder`].
1716#[derive(Debug)]
1717pub struct WriteLocksBuilder {
1718    locks: BTreeMap<CatalogItemId, Option<tokio::sync::OwnedMutexGuard<()>>>,
1719}
1720
1721impl WriteLocksBuilder {
1722    /// Adds a lock to this builder.
1723    pub fn insert_lock(&mut self, id: CatalogItemId, lock: tokio::sync::OwnedMutexGuard<()>) {
1724        self.locks.insert(id, Some(lock));
1725    }
1726
1727    /// Finish this builder by returning either all of the necessary locks, or none of them.
1728    ///
1729    /// If we fail to acquire all of the locks, returns one of the [`CatalogItemId`]s that we
1730    /// failed to acquire a lock for, that should be awaited so we know when to run again.
1731    pub fn all_or_nothing(self, conn_id: &ConnectionId) -> Result<WriteLocks, CatalogItemId> {
1732        let (locks, missing): (BTreeMap<_, _>, BTreeSet<_>) =
1733            self.locks
1734                .into_iter()
1735                .partition_map(|(gid, lock)| match lock {
1736                    Some(lock) => itertools::Either::Left((gid, lock)),
1737                    None => itertools::Either::Right(gid),
1738                });
1739
1740        match missing.iter().next() {
1741            None => {
1742                tracing::info!(%conn_id, ?locks, "acquired write locks");
1743                Ok(WriteLocks {
1744                    locks,
1745                    conn_id: conn_id.clone(),
1746                })
1747            }
1748            Some(gid) => {
1749                tracing::info!(?missing, "failed to acquire write locks");
1750                // Explicitly drop the already acquired locks.
1751                drop(locks);
1752                Err(*gid)
1753            }
1754        }
1755    }
1756}
1757
1758/// Collection of [`WriteLocks`] gathered during [`group_commit`].
1759///
1760/// Note: This struct should __never__ be used outside of group commit because it attempts to merge
1761/// together several collections of [`WriteLocks`] which if not done carefully can cause deadlocks
1762/// or consistency violations.
1763///
1764/// We must prevent writes from occurring to tables during read then write plans (e.g. `UPDATE`)
1765/// but we can allow blind writes (e.g. `INSERT`) to get committed concurrently at the same
1766/// timestamp when submitting the updates from a read then write plan.
1767///
1768/// Naively it would seem as though we could allow blind writes to occur whenever as blind writes
1769/// could never cause invalid retractions, but it could cause us to violate serializability because
1770/// there is no total order we could define for the transactions. Consider the following scenario:
1771///
1772/// ```text
1773/// table: foo
1774///
1775///  a | b
1776/// --------
1777///  x   2
1778///  y   3
1779///  z   4
1780///
1781/// -- Session(A)
1782/// -- read then write plan, reads at t0, writes at t3, transaction Ta
1783/// DELETE FROM foo WHERE b % 2 = 0;
1784///
1785///
1786/// -- Session(B)
1787/// -- blind write into foo, writes at t1, transaction Tb
1788/// INSERT INTO foo VALUES ('q', 6);
1789/// -- select from foo, reads at t2, transaction Tc
1790/// SELECT * FROM foo;
1791///
1792///
1793/// The times these operations occur at are ordered:
1794/// t0 < t1 < t2 < t3
1795///
1796/// Given the timing of the operations, the transactions must have the following order:
1797///
1798/// * Ta does not observe ('q', 6), so Ta < Tb
1799/// * Tc does observe ('q', 6), so Tb < Tc
1800/// * Tc does not observe the retractions from Ta, so Tc < Ta
1801///
1802/// For total order to exist, Ta < Tb < Tc < Ta, which is impossible.
1803/// ```
1804///
1805/// [`group_commit`]: super::coord::Coordinator::group_commit
1806#[derive(Debug, Default)]
1807pub(crate) struct GroupCommitWriteLocks {
1808    locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1809}
1810
1811impl GroupCommitWriteLocks {
1812    /// Merge a set of [`WriteLocks`] into this collection for group commit.
1813    pub fn merge(&mut self, mut locks: WriteLocks) {
1814        // Note: Ideally we would use `.drain`, but that method doesn't exist for BTreeMap.
1815        //
1816        // See: <https://github.com/rust-lang/rust/issues/81074>
1817        let existing = std::mem::take(&mut locks.locks);
1818        self.locks.extend(existing);
1819    }
1820
1821    /// Returns the collections we're missing locks for, if any.
1822    pub fn missing_locks(
1823        &self,
1824        writes: impl Iterator<Item = CatalogItemId>,
1825    ) -> BTreeSet<CatalogItemId> {
1826        let mut missing = BTreeSet::new();
1827        for write in writes {
1828            if !self.locks.contains_key(&write) {
1829                missing.insert(write);
1830            }
1831        }
1832        missing
1833    }
1834}
1835
1836impl Drop for GroupCommitWriteLocks {
1837    fn drop(&mut self) {
1838        if !self.locks.is_empty() {
1839            tracing::info!(
1840                locks = ?self.locks,
1841                "dropping group commit write locks",
1842            );
1843        }
1844    }
1845}