mz_adapter/
session.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Per-connection configuration parameters and state.
11
12#![warn(missing_docs)]
13
14use std::collections::btree_map::Entry;
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Debug;
17use std::future::Future;
18use std::mem;
19use std::net::IpAddr;
20use std::sync::Arc;
21
22use chrono::{DateTime, Utc};
23use derivative::Derivative;
24use itertools::Itertools;
25use mz_adapter_types::connection::ConnectionId;
26use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
27use mz_controller_types::ClusterId;
28use mz_ore::metrics::{MetricsFutureExt, MetricsRegistry};
29use mz_ore::now::{EpochMillis, NowFn};
30use mz_pgwire_common::Format;
31use mz_repr::role_id::RoleId;
32use mz_repr::user::{ExternalUserMetadata, InternalUserMetadata};
33use mz_repr::{CatalogItemId, Datum, Row, RowIterator, ScalarType, TimestampManipulation};
34use mz_sql::ast::{AstInfo, Raw, Statement, TransactionAccessMode};
35use mz_sql::plan::{Params, PlanContext, QueryWhen, StatementDesc};
36use mz_sql::session::metadata::SessionMetadata;
37use mz_sql::session::user::{
38    INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER, RoleMetadata, SYSTEM_USER, User,
39};
40use mz_sql::session::vars::IsolationLevel;
41pub use mz_sql::session::vars::{
42    DEFAULT_DATABASE_NAME, EndTransactionAction, SERVER_MAJOR_VERSION, SERVER_MINOR_VERSION,
43    SERVER_PATCH_VERSION, SessionVars, Var,
44};
45use mz_sql_parser::ast::TransactionIsolationLevel;
46use mz_storage_client::client::TableData;
47use mz_storage_types::sources::Timeline;
48use qcell::{QCell, QCellOwner};
49use rand::Rng;
50use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
51use tokio::sync::watch;
52use uuid::Uuid;
53
54use crate::AdapterNotice;
55use crate::catalog::CatalogState;
56use crate::client::RecordFirstRowStream;
57use crate::coord::ExplainContext;
58use crate::coord::appends::BuiltinTableAppendNotify;
59use crate::coord::in_memory_oracle::InMemoryTimestampOracle;
60use crate::coord::peek::PeekResponseUnary;
61use crate::coord::statement_logging::PreparedStatementLoggingInfo;
62use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
63use crate::error::AdapterError;
64use crate::metrics::{Metrics, SessionMetrics};
65
66const DUMMY_CONNECTION_ID: ConnectionId = ConnectionId::Static(0);
67
68/// A session holds per-connection state.
69#[derive(Derivative)]
70#[derivative(Debug)]
71pub struct Session<T = mz_repr::Timestamp>
72where
73    T: Debug + Clone + Send + Sync,
74{
75    conn_id: ConnectionId,
76    /// A globally unique identifier for the session. Not to be confused
77    /// with `conn_id`, which may be reused.
78    uuid: Uuid,
79    prepared_statements: BTreeMap<String, PreparedStatement>,
80    portals: BTreeMap<String, Portal>,
81    transaction: TransactionStatus<T>,
82    pcx: Option<PlanContext>,
83    metrics: SessionMetrics,
84    #[derivative(Debug = "ignore")]
85    builtin_updates: Option<BuiltinTableAppendNotify>,
86
87    /// The role metadata of the current session.
88    ///
89    /// Invariant: role_metadata must be `Some` after the user has
90    /// successfully connected to and authenticated with Materialize.
91    ///
92    /// Prefer using this value over [`Self.user.name`].
93    //
94    // It would be better for this not to be an Option, but the
95    // `Session` is initialized before the user has connected to
96    // Materialize and is able to look up the `RoleMetadata`. The `Session`
97    // is also used to return an error when no role exists and
98    // therefore there is no valid `RoleMetadata`.
99    role_metadata: Option<RoleMetadata>,
100    client_ip: Option<IpAddr>,
101    vars: SessionVars,
102    notices_tx: mpsc::UnboundedSender<AdapterNotice>,
103    notices_rx: mpsc::UnboundedReceiver<AdapterNotice>,
104    next_transaction_id: TransactionId,
105    secret_key: u32,
106    external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
107    // Token allowing us to access `Arc<QCell<StatementLogging>>`
108    // metadata. We want these to be reference-counted, because the same
109    // statement might be referenced from multiple portals simultaneously.
110    //
111    // However, they can't be `Rc<RefCell<StatementLogging>>`, because
112    // the `Session` is sent around to different threads.
113    //
114    // On the other hand, they don't need to be
115    // `Arc<Mutex<StatementLogging>>`, because they will always be
116    // accessed from the same thread that the `Session` is currently
117    // on. We express this by gating access with this token.
118    #[derivative(Debug = "ignore")]
119    qcell_owner: QCellOwner,
120    session_oracles: BTreeMap<Timeline, InMemoryTimestampOracle<T, NowFn<T>>>,
121}
122
123impl<T> SessionMetadata for Session<T>
124where
125    T: Debug + Clone + Send + Sync,
126    T: TimestampManipulation,
127{
128    fn conn_id(&self) -> &ConnectionId {
129        &self.conn_id
130    }
131
132    fn client_ip(&self) -> Option<&IpAddr> {
133        self.client_ip.as_ref()
134    }
135
136    fn pcx(&self) -> &PlanContext {
137        &self
138            .transaction()
139            .inner()
140            .expect("no active transaction")
141            .pcx
142    }
143
144    fn role_metadata(&self) -> &RoleMetadata {
145        self.role_metadata
146            .as_ref()
147            .expect("role_metadata invariant violated")
148    }
149
150    fn vars(&self) -> &SessionVars {
151        &self.vars
152    }
153}
154
155/// Data structure suitable for passing to other threads that need access to some common Session
156/// properties.
157#[derive(Debug)]
158pub struct SessionMeta {
159    conn_id: ConnectionId,
160    client_ip: Option<IpAddr>,
161    pcx: PlanContext,
162    role_metadata: RoleMetadata,
163    vars: SessionVars,
164}
165
166impl SessionMetadata for SessionMeta {
167    fn vars(&self) -> &SessionVars {
168        &self.vars
169    }
170
171    fn conn_id(&self) -> &ConnectionId {
172        &self.conn_id
173    }
174
175    fn client_ip(&self) -> Option<&IpAddr> {
176        self.client_ip.as_ref()
177    }
178
179    fn pcx(&self) -> &PlanContext {
180        &self.pcx
181    }
182
183    fn role_metadata(&self) -> &RoleMetadata {
184        &self.role_metadata
185    }
186}
187
188/// Configures a new [`Session`].
189#[derive(Debug, Clone)]
190pub struct SessionConfig {
191    /// The connection ID for the session.
192    ///
193    /// May be reused after the session terminates.
194    pub conn_id: ConnectionId,
195    /// A universally unique identifier for the session, across all processes,
196    /// region, and all time.
197    ///
198    /// Must not be reused, even after the session terminates.
199    pub uuid: Uuid,
200    /// The peer address of the client
201    pub client_ip: Option<IpAddr>,
202    /// The name of the user associated with the session.
203    pub user: String,
204    /// An optional receiver that the session will periodically check for
205    /// updates to a user's external metadata.
206    pub external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
207    /// The metadata of the user associated with the session.
208    pub internal_user_metadata: Option<InternalUserMetadata>,
209    /// Helm chart version
210    pub helm_chart_version: Option<String>,
211}
212
213impl<T: TimestampManipulation> Session<T> {
214    /// Creates a new session for the specified connection ID.
215    pub(crate) fn new(
216        build_info: &'static BuildInfo,
217        config: SessionConfig,
218        metrics: SessionMetrics,
219    ) -> Session<T> {
220        assert_ne!(config.conn_id, DUMMY_CONNECTION_ID);
221        Self::new_internal(build_info, config, metrics)
222    }
223
224    /// Returns a reference-less collection of data usable by other tasks that don't have ownership
225    /// of the Session.
226    pub fn meta(&self) -> SessionMeta {
227        SessionMeta {
228            conn_id: self.conn_id().clone(),
229            client_ip: self.client_ip().copied(),
230            pcx: self.pcx().clone(),
231            role_metadata: self.role_metadata().clone(),
232            vars: self.vars.clone(),
233        }
234
235        // TODO: soft_assert that these are the same as Session.
236    }
237
238    /// Creates new statement logging metadata for a one-off
239    /// statement.
240    // Normally, such logging information would be created as part of
241    // allocating a new prepared statement, and a refcounted handle
242    // would be copied from that prepared statement to portals during
243    // binding. However, we also support (via `Command::declare`)
244    // binding a statement directly to a portal without creating an
245    // intermediate prepared statement. Thus, for those cases, a
246    // mechanism for generating the logging metadata directly is needed.
247    pub(crate) fn mint_logging<A: AstInfo>(
248        &self,
249        raw_sql: String,
250        stmt: Option<&Statement<A>>,
251        now: EpochMillis,
252    ) -> Arc<QCell<PreparedStatementLoggingInfo>> {
253        Arc::new(QCell::new(
254            &self.qcell_owner,
255            PreparedStatementLoggingInfo::still_to_log(
256                raw_sql,
257                stmt,
258                now,
259                "".to_string(),
260                self.uuid,
261                false,
262            ),
263        ))
264    }
265
266    pub(crate) fn qcell_rw<'a, T2: 'a>(&'a mut self, cell: &'a Arc<QCell<T2>>) -> &'a mut T2 {
267        self.qcell_owner.rw(&*cell)
268    }
269
270    /// Returns a unique ID for the session.
271    /// Not to be confused with `connection_id`, which can be reused.
272    pub fn uuid(&self) -> Uuid {
273        self.uuid
274    }
275
276    /// Creates a new dummy session.
277    ///
278    /// Dummy sessions are intended for use when executing queries on behalf of
279    /// the system itself, rather than on behalf of a user.
280    pub fn dummy() -> Session<T> {
281        let registry = MetricsRegistry::new();
282        let metrics = Metrics::register_into(&registry);
283        let metrics = metrics.session_metrics();
284        let mut dummy = Self::new_internal(
285            &DUMMY_BUILD_INFO,
286            SessionConfig {
287                conn_id: DUMMY_CONNECTION_ID,
288                uuid: Uuid::new_v4(),
289                user: SYSTEM_USER.name.clone(),
290                client_ip: None,
291                external_metadata_rx: None,
292                internal_user_metadata: None,
293                helm_chart_version: None,
294            },
295            metrics,
296        );
297        dummy.initialize_role_metadata(RoleId::User(0));
298        dummy
299    }
300
301    fn new_internal(
302        build_info: &'static BuildInfo,
303        SessionConfig {
304            conn_id,
305            uuid,
306            user,
307            client_ip,
308            mut external_metadata_rx,
309            internal_user_metadata,
310            helm_chart_version,
311        }: SessionConfig,
312        metrics: SessionMetrics,
313    ) -> Session<T> {
314        let (notices_tx, notices_rx) = mpsc::unbounded_channel();
315        let default_cluster = INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER.get(&user);
316        let user = User {
317            name: user,
318            internal_metadata: internal_user_metadata,
319            external_metadata: external_metadata_rx
320                .as_mut()
321                .map(|rx| rx.borrow_and_update().clone()),
322        };
323        let mut vars = SessionVars::new_unchecked(build_info, user, helm_chart_version);
324        if let Some(default_cluster) = default_cluster {
325            vars.set_cluster(default_cluster.clone());
326        }
327        Session {
328            conn_id,
329            uuid,
330            transaction: TransactionStatus::Default,
331            pcx: None,
332            metrics,
333            builtin_updates: None,
334            prepared_statements: BTreeMap::new(),
335            portals: BTreeMap::new(),
336            role_metadata: None,
337            client_ip,
338            vars,
339            notices_tx,
340            notices_rx,
341            next_transaction_id: 0,
342            secret_key: rand::thread_rng().r#gen(),
343            external_metadata_rx,
344            qcell_owner: QCellOwner::new(),
345            session_oracles: BTreeMap::new(),
346        }
347    }
348
349    /// Returns the secret key associated with the session.
350    pub fn secret_key(&self) -> u32 {
351        self.secret_key
352    }
353
354    fn new_pcx(&self, mut wall_time: DateTime<Utc>) -> PlanContext {
355        if let Some(mock_time) = self.vars().unsafe_new_transaction_wall_time() {
356            wall_time = *mock_time;
357        }
358        PlanContext::new(wall_time)
359    }
360
361    /// Starts an explicit transaction, or changes an implicit to an explicit
362    /// transaction.
363    pub fn start_transaction(
364        &mut self,
365        wall_time: DateTime<Utc>,
366        access: Option<TransactionAccessMode>,
367        isolation_level: Option<TransactionIsolationLevel>,
368    ) -> Result<(), AdapterError> {
369        // Check that current transaction state is compatible with new `access`
370        if let Some(txn) = self.transaction.inner() {
371            // `READ WRITE` prohibited if:
372            // - Currently in `READ ONLY`
373            // - Already performed a query
374            let read_write_prohibited = match txn.ops {
375                TransactionOps::Peeks { .. } | TransactionOps::Subscribe => {
376                    txn.access == Some(TransactionAccessMode::ReadOnly)
377                }
378                TransactionOps::None
379                | TransactionOps::Writes(_)
380                | TransactionOps::SingleStatement { .. }
381                | TransactionOps::DDL { .. } => false,
382            };
383
384            if read_write_prohibited && access == Some(TransactionAccessMode::ReadWrite) {
385                return Err(AdapterError::ReadWriteUnavailable);
386            }
387        }
388
389        match std::mem::take(&mut self.transaction) {
390            TransactionStatus::Default => {
391                let id = self.next_transaction_id;
392                self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
393                self.transaction = TransactionStatus::InTransaction(Transaction {
394                    pcx: self.new_pcx(wall_time),
395                    ops: TransactionOps::None,
396                    write_lock_guards: None,
397                    access,
398                    id,
399                });
400            }
401            TransactionStatus::Started(mut txn)
402            | TransactionStatus::InTransactionImplicit(mut txn)
403            | TransactionStatus::InTransaction(mut txn) => {
404                if access.is_some() {
405                    txn.access = access;
406                }
407                self.transaction = TransactionStatus::InTransaction(txn);
408            }
409            TransactionStatus::Failed(_) => unreachable!(),
410        };
411
412        if let Some(isolation_level) = isolation_level {
413            self.vars
414                .set_local_transaction_isolation(isolation_level.into());
415        }
416
417        Ok(())
418    }
419
420    /// Starts either a single statement or implicit transaction based on the
421    /// number of statements, but only if no transaction has been started already.
422    pub fn start_transaction_implicit(&mut self, wall_time: DateTime<Utc>, stmts: usize) {
423        if let TransactionStatus::Default = self.transaction {
424            let id = self.next_transaction_id;
425            self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
426            let txn = Transaction {
427                pcx: self.new_pcx(wall_time),
428                ops: TransactionOps::None,
429                write_lock_guards: None,
430                access: None,
431                id,
432            };
433            match stmts {
434                1 => self.transaction = TransactionStatus::Started(txn),
435                n if n > 1 => self.transaction = TransactionStatus::InTransactionImplicit(txn),
436                _ => {}
437            }
438        }
439    }
440
441    /// Starts a single statement transaction, but only if no transaction has been started already.
442    pub fn start_transaction_single_stmt(&mut self, wall_time: DateTime<Utc>) {
443        self.start_transaction_implicit(wall_time, 1);
444    }
445
446    /// Clears a transaction, setting its state to Default and destroying all
447    /// portals. Returned are:
448    /// - sinks that were started in this transaction and need to be dropped
449    /// - the cleared transaction so its operations can be handled
450    ///
451    /// The [Postgres protocol docs](https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY) specify:
452    /// > a named portal object lasts till the end of the current transaction
453    /// and
454    /// > An unnamed portal is destroyed at the end of the transaction
455    #[must_use]
456    pub fn clear_transaction(&mut self) -> TransactionStatus<T> {
457        self.portals.clear();
458        self.pcx = None;
459        mem::take(&mut self.transaction)
460    }
461
462    /// Marks the current transaction as failed.
463    pub fn fail_transaction(mut self) -> Self {
464        match self.transaction {
465            TransactionStatus::Default => unreachable!(),
466            TransactionStatus::Started(txn)
467            | TransactionStatus::InTransactionImplicit(txn)
468            | TransactionStatus::InTransaction(txn) => {
469                self.transaction = TransactionStatus::Failed(txn);
470            }
471            TransactionStatus::Failed(_) => {}
472        };
473        self
474    }
475
476    /// Returns the current transaction status.
477    pub fn transaction(&self) -> &TransactionStatus<T> {
478        &self.transaction
479    }
480
481    /// Returns the current transaction status.
482    pub fn transaction_mut(&mut self) -> &mut TransactionStatus<T> {
483        &mut self.transaction
484    }
485
486    /// Returns the session's transaction code.
487    pub fn transaction_code(&self) -> TransactionCode {
488        self.transaction().into()
489    }
490
491    /// Adds operations to the current transaction. An error is produced if
492    /// they cannot be merged (i.e., a timestamp-dependent read cannot be
493    /// merged to an insert).
494    pub fn add_transaction_ops(&mut self, add_ops: TransactionOps<T>) -> Result<(), AdapterError> {
495        self.transaction.add_ops(add_ops)
496    }
497
498    /// Returns a channel on which to send notices to the session.
499    pub fn retain_notice_transmitter(&self) -> UnboundedSender<AdapterNotice> {
500        self.notices_tx.clone()
501    }
502
503    /// Adds a notice to the session.
504    pub fn add_notice(&self, notice: AdapterNotice) {
505        self.add_notices([notice])
506    }
507
508    /// Adds multiple notices to the session.
509    pub fn add_notices(&self, notices: impl IntoIterator<Item = AdapterNotice>) {
510        for notice in notices {
511            let _ = self.notices_tx.send(notice);
512        }
513    }
514
515    /// Awaits a possible notice.
516    ///
517    /// This method is cancel safe.
518    pub async fn recv_notice(&mut self) -> AdapterNotice {
519        // This method is cancel safe because recv is cancel safe.
520        loop {
521            let notice = self
522                .notices_rx
523                .recv()
524                .await
525                .expect("Session also holds a sender, so recv won't ever return None");
526            match self.notice_filter(notice) {
527                Some(notice) => return notice,
528                None => continue,
529            }
530        }
531    }
532
533    /// Returns a draining iterator over the notices attached to the session.
534    pub fn drain_notices(&mut self) -> Vec<AdapterNotice> {
535        let mut notices = Vec::new();
536        while let Ok(notice) = self.notices_rx.try_recv() {
537            if let Some(notice) = self.notice_filter(notice) {
538                notices.push(notice);
539            }
540        }
541        notices
542    }
543
544    /// Returns Some if the notice should be reported, otherwise None.
545    fn notice_filter(&self, notice: AdapterNotice) -> Option<AdapterNotice> {
546        // Filter out low threshold severity.
547        let minimum_client_severity = self.vars.client_min_messages();
548        let sev = notice.severity();
549        if !minimum_client_severity.should_output_to_client(&sev) {
550            return None;
551        }
552        // Filter out notices for other clusters.
553        if let AdapterNotice::ClusterReplicaStatusChanged { cluster, .. } = &notice {
554            if cluster != self.vars.cluster() {
555                return None;
556            }
557        }
558        Some(notice)
559    }
560
561    /// Sets the transaction ops to `TransactionOps::None`. Must only be used after
562    /// verifying that no transaction anomalies will occur if cleared.
563    pub fn clear_transaction_ops(&mut self) {
564        if let Some(txn) = self.transaction.inner_mut() {
565            txn.ops = TransactionOps::None;
566        }
567    }
568
569    /// If the current transaction ops belong to a read, then sets the
570    /// ops to `None`, returning the old read timestamp context if
571    /// any existed. Must only be used after verifying that no transaction
572    /// anomalies will occur if cleared.
573    pub fn take_transaction_timestamp_context(&mut self) -> Option<TimestampContext<T>> {
574        if let Some(Transaction { ops, .. }) = self.transaction.inner_mut() {
575            if let TransactionOps::Peeks { .. } = ops {
576                let ops = std::mem::take(ops);
577                Some(
578                    ops.timestamp_determination()
579                        .expect("checked above")
580                        .timestamp_context,
581                )
582            } else {
583                None
584            }
585        } else {
586            None
587        }
588    }
589
590    /// Returns the transaction's read timestamp determination, if set.
591    ///
592    /// Returns `None` if there is no active transaction, or if the active
593    /// transaction is not a read transaction.
594    pub fn get_transaction_timestamp_determination(&self) -> Option<TimestampDetermination<T>> {
595        match self.transaction.inner() {
596            Some(Transaction {
597                pcx: _,
598                ops: TransactionOps::Peeks { determination, .. },
599                write_lock_guards: _,
600                access: _,
601                id: _,
602            }) => Some(determination.clone()),
603            _ => None,
604        }
605    }
606
607    /// Whether this session has a timestamp for a read transaction.
608    pub fn contains_read_timestamp(&self) -> bool {
609        matches!(
610            self.transaction.inner(),
611            Some(Transaction {
612                pcx: _,
613                ops: TransactionOps::Peeks {
614                    determination: TimestampDetermination {
615                        timestamp_context: TimestampContext::TimelineTimestamp { .. },
616                        ..
617                    },
618                    ..
619                },
620                write_lock_guards: _,
621                access: _,
622                id: _,
623            })
624        )
625    }
626
627    /// Registers the prepared statement under `name`.
628    pub fn set_prepared_statement(
629        &mut self,
630        name: String,
631        stmt: Option<Statement<Raw>>,
632        raw_sql: String,
633        desc: StatementDesc,
634        catalog_revision: u64,
635        now: EpochMillis,
636    ) {
637        let logging = PreparedStatementLoggingInfo::still_to_log(
638            raw_sql,
639            stmt.as_ref(),
640            now,
641            name.clone(),
642            self.uuid,
643            false,
644        );
645        let statement = PreparedStatement {
646            stmt,
647            desc,
648            catalog_revision,
649            logging: Arc::new(QCell::new(&self.qcell_owner, logging)),
650        };
651        self.prepared_statements.insert(name, statement);
652    }
653
654    /// Removes the prepared statement associated with `name`.
655    ///
656    /// Returns whether a statement previously existed.
657    pub fn remove_prepared_statement(&mut self, name: &str) -> bool {
658        self.prepared_statements.remove(name).is_some()
659    }
660
661    /// Removes all prepared statements.
662    pub fn remove_all_prepared_statements(&mut self) {
663        self.prepared_statements.clear();
664    }
665
666    /// Retrieves the prepared statement associated with `name`.
667    ///
668    /// This is unverified and could be incorrect if the underlying catalog has
669    /// changed.
670    pub fn get_prepared_statement_unverified(&self, name: &str) -> Option<&PreparedStatement> {
671        self.prepared_statements.get(name)
672    }
673
674    /// Retrieves the prepared statement associated with `name`.
675    ///
676    /// This is unverified and could be incorrect if the underlying catalog has
677    /// changed.
678    pub fn get_prepared_statement_mut_unverified(
679        &mut self,
680        name: &str,
681    ) -> Option<&mut PreparedStatement> {
682        self.prepared_statements.get_mut(name)
683    }
684
685    /// Returns the prepared statements for the session.
686    pub fn prepared_statements(&self) -> &BTreeMap<String, PreparedStatement> {
687        &self.prepared_statements
688    }
689
690    /// Binds the specified portal to the specified prepared statement.
691    ///
692    /// If the prepared statement contains parameters, the values and types of
693    /// those parameters must be provided in `params`. It is the caller's
694    /// responsibility to ensure that the correct number of parameters is
695    /// provided.
696    ///
697    /// The `results_formats` parameter sets the desired format of the results,
698    /// and is stored on the portal.
699    pub fn set_portal(
700        &mut self,
701        portal_name: String,
702        desc: StatementDesc,
703        stmt: Option<Statement<Raw>>,
704        logging: Arc<QCell<PreparedStatementLoggingInfo>>,
705        params: Vec<(Datum, ScalarType)>,
706        result_formats: Vec<Format>,
707        catalog_revision: u64,
708    ) -> Result<(), AdapterError> {
709        // The empty portal can be silently replaced.
710        if !portal_name.is_empty() && self.portals.contains_key(&portal_name) {
711            return Err(AdapterError::DuplicateCursor(portal_name));
712        }
713        let param_types = desc.param_types.clone();
714        self.portals.insert(
715            portal_name,
716            Portal {
717                stmt: stmt.map(Arc::new),
718                desc,
719                catalog_revision,
720                parameters: Params {
721                    datums: Row::pack(params.iter().map(|(d, _t)| d)),
722                    execute_types: params.into_iter().map(|(_d, t)| t).collect(),
723                    expected_types: param_types,
724                },
725                result_formats,
726                state: PortalState::NotStarted,
727                logging,
728                lifecycle_timestamps: None,
729            },
730        );
731        Ok(())
732    }
733
734    /// Removes the specified portal.
735    ///
736    /// If there is no such portal, this method does nothing. Returns whether that portal existed.
737    pub fn remove_portal(&mut self, portal_name: &str) -> bool {
738        self.portals.remove(portal_name).is_some()
739    }
740
741    /// Retrieves a reference to the specified portal.
742    ///
743    /// If there is no such portal, returns `None`.
744    pub fn get_portal_unverified(&self, portal_name: &str) -> Option<&Portal> {
745        self.portals.get(portal_name)
746    }
747
748    /// Retrieves a mutable reference to the specified portal.
749    ///
750    /// If there is no such portal, returns `None`.
751    pub fn get_portal_unverified_mut(&mut self, portal_name: &str) -> Option<&mut Portal> {
752        self.portals.get_mut(portal_name)
753    }
754
755    /// Creates and installs a new portal.
756    pub fn create_new_portal(
757        &mut self,
758        stmt: Option<Statement<Raw>>,
759        logging: Arc<QCell<PreparedStatementLoggingInfo>>,
760        desc: StatementDesc,
761        parameters: Params,
762        result_formats: Vec<Format>,
763        catalog_revision: u64,
764    ) -> Result<String, AdapterError> {
765        // See: https://github.com/postgres/postgres/blob/84f5c2908dad81e8622b0406beea580e40bb03ac/src/backend/utils/mmgr/portalmem.c#L234
766
767        for i in 0usize.. {
768            let name = format!("<unnamed portal {}>", i);
769            match self.portals.entry(name.clone()) {
770                Entry::Occupied(_) => continue,
771                Entry::Vacant(entry) => {
772                    entry.insert(Portal {
773                        stmt: stmt.map(Arc::new),
774                        desc,
775                        catalog_revision,
776                        parameters,
777                        result_formats,
778                        state: PortalState::NotStarted,
779                        logging,
780                        lifecycle_timestamps: None,
781                    });
782                    return Ok(name);
783                }
784            }
785        }
786
787        coord_bail!("unable to create a new portal");
788    }
789
790    /// Resets the session to its initial state. Returns sinks that need to be
791    /// dropped.
792    pub fn reset(&mut self) {
793        let _ = self.clear_transaction();
794        self.prepared_statements.clear();
795        self.vars.reset_all();
796    }
797
798    /// Returns the [application_name] that created this session.
799    ///
800    /// [application_name]: (https://www.postgresql.org/docs/current/runtime-config-logging.html#GUC-APPLICATION-NAME)
801    pub fn application_name(&self) -> &str {
802        self.vars.application_name()
803    }
804
805    /// Returns a reference to the variables in this session.
806    pub fn vars(&self) -> &SessionVars {
807        &self.vars
808    }
809
810    /// Returns a mutable reference to the variables in this session.
811    pub fn vars_mut(&mut self) -> &mut SessionVars {
812        &mut self.vars
813    }
814
815    /// Grants a set of write locks to this session's inner [`Transaction`].
816    ///
817    /// # Panics
818    /// If the inner transaction is idle. See [`TransactionStatus::try_grant_write_locks`].
819    ///
820    pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
821        self.transaction.try_grant_write_locks(guards)
822    }
823
824    /// Drains any external metadata updates and applies the changes from the latest update.
825    pub fn apply_external_metadata_updates(&mut self) {
826        // If no sender is registered then there isn't anything to do.
827        let Some(rx) = &mut self.external_metadata_rx else {
828            return;
829        };
830
831        // If the value hasn't changed then return.
832        if !rx.has_changed().unwrap_or(false) {
833            return;
834        }
835
836        // Update our metadata! Note the short critical section (just a clone) to avoid blocking
837        // the sending side of this watch channel.
838        let metadata = rx.borrow_and_update().clone();
839        self.vars.set_external_user_metadata(metadata);
840    }
841
842    /// Initializes the session's role metadata.
843    pub fn initialize_role_metadata(&mut self, role_id: RoleId) {
844        self.role_metadata = Some(RoleMetadata::new(role_id));
845    }
846
847    /// Ensures that a timestamp oracle exists for `timeline` and returns a mutable reference to
848    /// the timestamp oracle.
849    pub fn ensure_timestamp_oracle(
850        &mut self,
851        timeline: Timeline,
852    ) -> &mut InMemoryTimestampOracle<T, NowFn<T>> {
853        self.session_oracles
854            .entry(timeline)
855            .or_insert_with(|| InMemoryTimestampOracle::new(T::minimum(), NowFn::from(T::minimum)))
856    }
857
858    /// Ensures that a timestamp oracle exists for reads and writes from/to a local input and
859    /// returns a mutable reference to the timestamp oracle.
860    pub fn ensure_local_timestamp_oracle(&mut self) -> &mut InMemoryTimestampOracle<T, NowFn<T>> {
861        self.ensure_timestamp_oracle(Timeline::EpochMilliseconds)
862    }
863
864    /// Returns a reference to the timestamp oracle for `timeline`.
865    pub fn get_timestamp_oracle(
866        &self,
867        timeline: &Timeline,
868    ) -> Option<&InMemoryTimestampOracle<T, NowFn<T>>> {
869        self.session_oracles.get(timeline)
870    }
871
872    /// If the current session is using the Strong Session Serializable isolation level advance the
873    /// session local timestamp oracle to `write_ts`.
874    pub fn apply_write(&mut self, timestamp: T) {
875        if self.vars().transaction_isolation() == &IsolationLevel::StrongSessionSerializable {
876            self.ensure_local_timestamp_oracle().apply_write(timestamp);
877        }
878    }
879
880    /// Returns the [`SessionMetrics`] instance associated with this [`Session`].
881    pub fn metrics(&self) -> &SessionMetrics {
882        &self.metrics
883    }
884
885    /// Sets the `BuiltinTableAppendNotify` for this session.
886    pub fn set_builtin_table_updates(&mut self, fut: BuiltinTableAppendNotify) {
887        let prev = self.builtin_updates.replace(fut);
888        mz_ore::soft_assert_or_log!(prev.is_none(), "replacing old builtin table notify");
889    }
890
891    /// Takes the stashed `BuiltinTableAppendNotify`, if one exists, and returns a [`Future`] that
892    /// waits for the writes to complete.
893    pub fn clear_builtin_table_updates(&mut self) -> Option<impl Future<Output = ()> + 'static> {
894        if let Some(fut) = self.builtin_updates.take() {
895            // Record how long we blocked for, if we blocked at all.
896            let histogram = self
897                .metrics()
898                .session_startup_table_writes_seconds()
899                .clone();
900            Some(async move {
901                fut.wall_time().observe(histogram).await;
902            })
903        } else {
904            None
905        }
906    }
907}
908
909/// A prepared statement.
910#[derive(Derivative, Clone)]
911#[derivative(Debug)]
912pub struct PreparedStatement {
913    stmt: Option<Statement<Raw>>,
914    desc: StatementDesc,
915    /// The most recent catalog revision that has verified this statement.
916    pub catalog_revision: u64,
917    #[derivative(Debug = "ignore")]
918    logging: Arc<QCell<PreparedStatementLoggingInfo>>,
919}
920
921impl PreparedStatement {
922    /// Returns the AST associated with this prepared statement,
923    /// if the prepared statement was not the empty query.
924    pub fn stmt(&self) -> Option<&Statement<Raw>> {
925        self.stmt.as_ref()
926    }
927
928    /// Returns the description of the prepared statement.
929    pub fn desc(&self) -> &StatementDesc {
930        &self.desc
931    }
932
933    /// Returns a handle to the metadata for statement logging.
934    pub fn logging(&self) -> &Arc<QCell<PreparedStatementLoggingInfo>> {
935        &self.logging
936    }
937}
938
939/// A portal represents the execution state of a running or runnable query.
940#[derive(Derivative)]
941#[derivative(Debug)]
942pub struct Portal {
943    /// The statement that is bound to this portal.
944    pub stmt: Option<Arc<Statement<Raw>>>,
945    /// The statement description.
946    pub desc: StatementDesc,
947    /// The most recent catalog revision that has verified this statement.
948    pub catalog_revision: u64,
949    /// The bound values for the parameters in the prepared statement, if any.
950    pub parameters: Params,
951    /// The desired output format for each column in the result set.
952    pub result_formats: Vec<Format>,
953    /// A handle to metadata needed for statement logging.
954    #[derivative(Debug = "ignore")]
955    pub logging: Arc<QCell<PreparedStatementLoggingInfo>>,
956    /// The execution state of the portal.
957    #[derivative(Debug = "ignore")]
958    pub state: PortalState,
959    /// Statement lifecycle timestamps coming from `mz-pgwire`.
960    pub lifecycle_timestamps: Option<LifecycleTimestamps>,
961}
962
963/// Execution states of a portal.
964pub enum PortalState {
965    /// Portal not yet started.
966    NotStarted,
967    /// Portal is a rows-returning statement in progress with 0 or more rows
968    /// remaining.
969    InProgress(Option<InProgressRows>),
970    /// Portal has completed and should not be re-executed. If the optional string
971    /// is present, it is returned as a CommandComplete tag, otherwise an error
972    /// is sent.
973    Completed(Option<String>),
974}
975
976/// State of an in-progress, rows-returning portal.
977pub struct InProgressRows {
978    /// The current batch of rows.
979    pub current: Option<Box<dyn RowIterator + Send + Sync>>,
980    /// A stream from which to fetch more row batches.
981    pub remaining: RecordFirstRowStream,
982}
983
984impl InProgressRows {
985    /// Creates a new InProgressRows from a batch stream.
986    pub fn new(remaining: RecordFirstRowStream) -> Self {
987        Self {
988            current: None,
989            remaining,
990        }
991    }
992
993    /// Determines whether the underlying stream has ended and there are also no more rows
994    /// stashed in `current`.
995    pub fn no_more_rows(&self) -> bool {
996        self.remaining.no_more_rows && self.current.is_none()
997    }
998}
999
1000/// A channel of batched rows.
1001pub type RowBatchStream = UnboundedReceiver<PeekResponseUnary>;
1002
1003/// Part of statement lifecycle. These are timestamps that come from the Adapter frontend
1004/// (`mz-pgwire`) part of the lifecycle.
1005#[derive(Debug, Clone)]
1006pub struct LifecycleTimestamps {
1007    /// When the query was received. More specifically, when the tokio recv returned.
1008    /// For a Simple Query, this is for the whole query, for the Extended Query flow, this is only
1009    /// for `FrontendMessage::Execute`. (This means that this is after parsing for the
1010    /// Extended Query flow.)
1011    pub received: EpochMillis,
1012}
1013
1014impl LifecycleTimestamps {
1015    /// Creates a new `LifecycleTimestamps`.
1016    pub fn new(received: EpochMillis) -> Self {
1017        Self { received }
1018    }
1019}
1020
1021/// The transaction status of a session.
1022///
1023/// PostgreSQL's transaction states are in backend/access/transam/xact.c.
1024#[derive(Debug)]
1025pub enum TransactionStatus<T> {
1026    /// Idle. Matches `TBLOCK_DEFAULT`.
1027    Default,
1028    /// Running a single-query transaction. Matches
1029    /// `TBLOCK_STARTED`. In PostgreSQL, when using the extended query protocol, this
1030    /// may be upgraded into multi-statement implicit query (see [`Self::InTransactionImplicit`]).
1031    /// Additionally, some statements may trigger an eager commit of the implicit transaction,
1032    /// see: <https://git.postgresql.org/gitweb/?p=postgresql.git&a=commitdiff&h=f92944137>. In
1033    /// Materialize however, we eagerly commit all statements outside of an explicit transaction
1034    /// when using the extended query protocol. Therefore, we can guarantee that this state will
1035    /// always be a single-query transaction and never be upgraded into a multi-statement implicit
1036    /// query.
1037    Started(Transaction<T>),
1038    /// Currently in a transaction issued from a `BEGIN`. Matches `TBLOCK_INPROGRESS`.
1039    InTransaction(Transaction<T>),
1040    /// Currently in an implicit transaction started from a multi-statement query
1041    /// with more than 1 statements. Matches `TBLOCK_IMPLICIT_INPROGRESS`.
1042    InTransactionImplicit(Transaction<T>),
1043    /// In a failed transaction. Matches `TBLOCK_ABORT`.
1044    Failed(Transaction<T>),
1045}
1046
1047impl<T: TimestampManipulation> TransactionStatus<T> {
1048    /// Extracts the inner transaction ops and write lock guard if not failed.
1049    pub fn into_ops_and_lock_guard(self) -> (Option<TransactionOps<T>>, Option<WriteLocks>) {
1050        match self {
1051            TransactionStatus::Default | TransactionStatus::Failed(_) => (None, None),
1052            TransactionStatus::Started(txn)
1053            | TransactionStatus::InTransaction(txn)
1054            | TransactionStatus::InTransactionImplicit(txn) => {
1055                (Some(txn.ops), txn.write_lock_guards)
1056            }
1057        }
1058    }
1059
1060    /// Exposes the inner transaction.
1061    pub fn inner(&self) -> Option<&Transaction<T>> {
1062        match self {
1063            TransactionStatus::Default => None,
1064            TransactionStatus::Started(txn)
1065            | TransactionStatus::InTransaction(txn)
1066            | TransactionStatus::InTransactionImplicit(txn)
1067            | TransactionStatus::Failed(txn) => Some(txn),
1068        }
1069    }
1070
1071    /// Exposes the inner transaction.
1072    pub fn inner_mut(&mut self) -> Option<&mut Transaction<T>> {
1073        match self {
1074            TransactionStatus::Default => None,
1075            TransactionStatus::Started(txn)
1076            | TransactionStatus::InTransaction(txn)
1077            | TransactionStatus::InTransactionImplicit(txn)
1078            | TransactionStatus::Failed(txn) => Some(txn),
1079        }
1080    }
1081
1082    /// Whether the transaction's ops are DDL.
1083    pub fn is_ddl(&self) -> bool {
1084        match self {
1085            TransactionStatus::Default => false,
1086            TransactionStatus::Started(txn)
1087            | TransactionStatus::InTransaction(txn)
1088            | TransactionStatus::InTransactionImplicit(txn)
1089            | TransactionStatus::Failed(txn) => {
1090                matches!(txn.ops, TransactionOps::DDL { .. })
1091            }
1092        }
1093    }
1094
1095    /// Expresses whether or not the transaction was implicitly started.
1096    /// However, its negation does not imply explicitly started.
1097    pub fn is_implicit(&self) -> bool {
1098        match self {
1099            TransactionStatus::Started(_) | TransactionStatus::InTransactionImplicit(_) => true,
1100            TransactionStatus::Default
1101            | TransactionStatus::InTransaction(_)
1102            | TransactionStatus::Failed(_) => false,
1103        }
1104    }
1105
1106    /// Whether the transaction may contain multiple statements.
1107    pub fn is_in_multi_statement_transaction(&self) -> bool {
1108        match self {
1109            TransactionStatus::InTransaction(_) | TransactionStatus::InTransactionImplicit(_) => {
1110                true
1111            }
1112            TransactionStatus::Default
1113            | TransactionStatus::Started(_)
1114            | TransactionStatus::Failed(_) => false,
1115        }
1116    }
1117
1118    /// Whether the transaction is in a multi-statement, immediate transaction.
1119    pub fn in_immediate_multi_stmt_txn(&self, when: &QueryWhen) -> bool {
1120        self.is_in_multi_statement_transaction() && when == &QueryWhen::Immediately
1121    }
1122
1123    /// Grants the writes lock to the inner transaction, returning an error if the transaction
1124    /// has already been granted write locks.
1125    ///
1126    /// # Panics
1127    /// If `self` is `TransactionStatus::Default`, which indicates that the
1128    /// transaction is idle, which is not appropriate to assign the
1129    /// coordinator's write lock to.
1130    ///
1131    pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1132        match self {
1133            TransactionStatus::Default => panic!("cannot grant write lock to txn not yet started"),
1134            TransactionStatus::Started(txn)
1135            | TransactionStatus::InTransaction(txn)
1136            | TransactionStatus::InTransactionImplicit(txn)
1137            | TransactionStatus::Failed(txn) => txn.try_grant_write_locks(guards),
1138        }
1139    }
1140
1141    /// Returns the currently held [`WriteLocks`], if this transaction holds any.
1142    pub fn write_locks(&self) -> Option<&WriteLocks> {
1143        match self {
1144            TransactionStatus::Default => None,
1145            TransactionStatus::Started(txn)
1146            | TransactionStatus::InTransaction(txn)
1147            | TransactionStatus::InTransactionImplicit(txn)
1148            | TransactionStatus::Failed(txn) => txn.write_lock_guards.as_ref(),
1149        }
1150    }
1151
1152    /// The timeline of the transaction, if one exists.
1153    pub fn timeline(&self) -> Option<Timeline> {
1154        match self {
1155            TransactionStatus::Default => None,
1156            TransactionStatus::Started(txn)
1157            | TransactionStatus::InTransaction(txn)
1158            | TransactionStatus::InTransactionImplicit(txn)
1159            | TransactionStatus::Failed(txn) => txn.timeline(),
1160        }
1161    }
1162
1163    /// The cluster of the transaction, if one exists.
1164    pub fn cluster(&self) -> Option<ClusterId> {
1165        match self {
1166            TransactionStatus::Default => None,
1167            TransactionStatus::Started(txn)
1168            | TransactionStatus::InTransaction(txn)
1169            | TransactionStatus::InTransactionImplicit(txn)
1170            | TransactionStatus::Failed(txn) => txn.cluster(),
1171        }
1172    }
1173
1174    /// Snapshot of the catalog that reflects DDL operations run in this transaction.
1175    pub fn catalog_state(&self) -> Option<&CatalogState> {
1176        match self.inner() {
1177            Some(Transaction {
1178                ops: TransactionOps::DDL { state, .. },
1179                ..
1180            }) => Some(state),
1181            _ => None,
1182        }
1183    }
1184
1185    /// Reports whether any operations have been executed as part of this transaction
1186    pub fn contains_ops(&self) -> bool {
1187        match self.inner() {
1188            Some(txn) => txn.contains_ops(),
1189            None => false,
1190        }
1191    }
1192
1193    /// Checks whether the current state of this transaction allows writes
1194    /// (adding write ops).
1195    /// transaction
1196    pub fn allows_writes(&self) -> bool {
1197        match self {
1198            TransactionStatus::Started(Transaction { ops, access, .. })
1199            | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1200            | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1201                match ops {
1202                    TransactionOps::None => access != &Some(TransactionAccessMode::ReadOnly),
1203                    TransactionOps::Peeks { determination, .. } => {
1204                        // If-and-only-if peeks thus far do not have a timestamp
1205                        // (i.e. they are constant), we can switch to a write
1206                        // transaction.
1207                        !determination.timestamp_context.contains_timestamp()
1208                    }
1209                    TransactionOps::Subscribe => false,
1210                    TransactionOps::Writes(_) => true,
1211                    TransactionOps::SingleStatement { .. } => false,
1212                    TransactionOps::DDL { .. } => false,
1213                }
1214            }
1215            TransactionStatus::Default | TransactionStatus::Failed(_) => {
1216                unreachable!()
1217            }
1218        }
1219    }
1220
1221    /// Adds operations to the current transaction. An error is produced if they cannot be merged
1222    /// (i.e., a timestamp-dependent read cannot be merged to an insert).
1223    ///
1224    /// The `DDL` variant is an exception and does not merge operations, but instead overwrites the
1225    /// old ops with the new ops. This is correct because it is only used in conjunction with the
1226    /// Dry Run catalog op which returns an error containing all of the ops, and those ops are
1227    /// passed to this function which then overwrites.
1228    ///
1229    /// # Panics
1230    /// If the operations are compatible but the operation metadata doesn't match. Such as reads at
1231    /// different timestamps, reads on different timelines, reads on different clusters, etc. It's
1232    /// up to the caller to make sure these are aligned.
1233    pub fn add_ops(&mut self, add_ops: TransactionOps<T>) -> Result<(), AdapterError> {
1234        match self {
1235            TransactionStatus::Started(Transaction { ops, access, .. })
1236            | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1237            | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1238                match ops {
1239                    TransactionOps::None => {
1240                        if matches!(access, Some(TransactionAccessMode::ReadOnly))
1241                            && matches!(add_ops, TransactionOps::Writes(_))
1242                        {
1243                            return Err(AdapterError::ReadOnlyTransaction);
1244                        }
1245                        *ops = add_ops;
1246                    }
1247                    TransactionOps::Peeks {
1248                        determination,
1249                        cluster_id,
1250                        requires_linearization,
1251                    } => match add_ops {
1252                        TransactionOps::Peeks {
1253                            determination: add_timestamp_determination,
1254                            cluster_id: add_cluster_id,
1255                            requires_linearization: add_requires_linearization,
1256                        } => {
1257                            assert_eq!(*cluster_id, add_cluster_id);
1258                            match (
1259                                &determination.timestamp_context,
1260                                &add_timestamp_determination.timestamp_context,
1261                            ) {
1262                                (
1263                                    TimestampContext::TimelineTimestamp {
1264                                        timeline: txn_timeline,
1265                                        chosen_ts: txn_ts,
1266                                        oracle_ts: _,
1267                                    },
1268                                    TimestampContext::TimelineTimestamp {
1269                                        timeline: add_timeline,
1270                                        chosen_ts: add_ts,
1271                                        oracle_ts: _,
1272                                    },
1273                                ) => {
1274                                    assert_eq!(txn_timeline, add_timeline);
1275                                    assert_eq!(txn_ts, add_ts);
1276                                }
1277                                (TimestampContext::NoTimestamp, _) => {
1278                                    *determination = add_timestamp_determination
1279                                }
1280                                (_, TimestampContext::NoTimestamp) => {}
1281                            };
1282                            if matches!(requires_linearization, RequireLinearization::NotRequired)
1283                                && matches!(
1284                                    add_requires_linearization,
1285                                    RequireLinearization::Required
1286                                )
1287                            {
1288                                *requires_linearization = add_requires_linearization;
1289                            }
1290                        }
1291                        // Iff peeks thus far do not have a timestamp (i.e.
1292                        // they are constant), we can switch to a write
1293                        // transaction.
1294                        writes @ TransactionOps::Writes(..)
1295                            if !determination.timestamp_context.contains_timestamp() =>
1296                        {
1297                            *ops = writes;
1298                        }
1299                        _ => return Err(AdapterError::ReadOnlyTransaction),
1300                    },
1301                    TransactionOps::Subscribe => {
1302                        return Err(AdapterError::SubscribeOnlyTransaction);
1303                    }
1304                    TransactionOps::Writes(txn_writes) => match add_ops {
1305                        TransactionOps::Writes(mut add_writes) => {
1306                            // We should have already checked the access above, but make sure we don't miss
1307                            // it anyway.
1308                            assert!(!matches!(access, Some(TransactionAccessMode::ReadOnly)));
1309                            txn_writes.append(&mut add_writes);
1310                        }
1311                        // Iff peeks do not have a timestamp (i.e. they are
1312                        // constant), we can permit them.
1313                        TransactionOps::Peeks { determination, .. }
1314                            if !determination.timestamp_context.contains_timestamp() => {}
1315                        _ => {
1316                            return Err(AdapterError::WriteOnlyTransaction);
1317                        }
1318                    },
1319                    TransactionOps::SingleStatement { .. } => {
1320                        return Err(AdapterError::SingleStatementTransaction);
1321                    }
1322                    TransactionOps::DDL {
1323                        ops: og_ops,
1324                        revision: og_revision,
1325                        state: og_state,
1326                    } => match add_ops {
1327                        TransactionOps::DDL {
1328                            ops: new_ops,
1329                            revision: new_revision,
1330                            state: new_state,
1331                        } => {
1332                            if *og_revision != new_revision {
1333                                return Err(AdapterError::DDLTransactionRace);
1334                            }
1335                            // The old og_ops are overwritten, not extended.
1336                            if !new_ops.is_empty() {
1337                                *og_ops = new_ops;
1338                                *og_state = new_state;
1339                            }
1340                        }
1341                        _ => return Err(AdapterError::DDLOnlyTransaction),
1342                    },
1343                }
1344            }
1345            TransactionStatus::Default | TransactionStatus::Failed(_) => {
1346                unreachable!()
1347            }
1348        }
1349        Ok(())
1350    }
1351}
1352
1353/// An abstraction allowing us to identify different transactions.
1354pub type TransactionId = u64;
1355
1356impl<T> Default for TransactionStatus<T> {
1357    fn default() -> Self {
1358        TransactionStatus::Default
1359    }
1360}
1361
1362/// State data for transactions.
1363#[derive(Debug)]
1364pub struct Transaction<T> {
1365    /// Plan context.
1366    pub pcx: PlanContext,
1367    /// Transaction operations.
1368    pub ops: TransactionOps<T>,
1369    /// Uniquely identifies the transaction on a per connection basis.
1370    /// Two transactions started from separate connections may share the
1371    /// same ID.
1372    /// If all IDs have been exhausted, this will wrap around back to 0.
1373    pub id: TransactionId,
1374    /// Locks for objects this transaction will operate on.
1375    write_lock_guards: Option<WriteLocks>,
1376    /// Access mode (read only, read write).
1377    access: Option<TransactionAccessMode>,
1378}
1379
1380impl<T> Transaction<T> {
1381    /// Tries to grant the write lock to this transaction for the remainder of its lifetime. Errors
1382    /// if this [`Transaction`] has already been granted write locks.
1383    fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1384        match &mut self.write_lock_guards {
1385            Some(existing) => Err(existing),
1386            locks @ None => {
1387                *locks = Some(guards);
1388                Ok(())
1389            }
1390        }
1391    }
1392
1393    /// The timeline of the transaction, if one exists.
1394    fn timeline(&self) -> Option<Timeline> {
1395        match &self.ops {
1396            TransactionOps::Peeks {
1397                determination:
1398                    TimestampDetermination {
1399                        timestamp_context: TimestampContext::TimelineTimestamp { timeline, .. },
1400                        ..
1401                    },
1402                ..
1403            } => Some(timeline.clone()),
1404            TransactionOps::Peeks { .. }
1405            | TransactionOps::None
1406            | TransactionOps::Subscribe
1407            | TransactionOps::Writes(_)
1408            | TransactionOps::SingleStatement { .. }
1409            | TransactionOps::DDL { .. } => None,
1410        }
1411    }
1412
1413    /// The cluster of the transaction, if one exists.
1414    pub fn cluster(&self) -> Option<ClusterId> {
1415        match &self.ops {
1416            TransactionOps::Peeks { cluster_id, .. } => Some(cluster_id.clone()),
1417            TransactionOps::None
1418            | TransactionOps::Subscribe
1419            | TransactionOps::Writes(_)
1420            | TransactionOps::SingleStatement { .. }
1421            | TransactionOps::DDL { .. } => None,
1422        }
1423    }
1424
1425    /// Reports whether any operations have been executed as part of this transaction
1426    fn contains_ops(&self) -> bool {
1427        !matches!(self.ops, TransactionOps::None)
1428    }
1429}
1430
1431/// A transaction's status code.
1432#[derive(Debug, Clone, Copy)]
1433pub enum TransactionCode {
1434    /// Not currently in a transaction
1435    Idle,
1436    /// Currently in a transaction
1437    InTransaction,
1438    /// Currently in a transaction block which is failed
1439    Failed,
1440}
1441
1442impl From<TransactionCode> for u8 {
1443    fn from(code: TransactionCode) -> Self {
1444        match code {
1445            TransactionCode::Idle => b'I',
1446            TransactionCode::InTransaction => b'T',
1447            TransactionCode::Failed => b'E',
1448        }
1449    }
1450}
1451
1452impl From<TransactionCode> for String {
1453    fn from(code: TransactionCode) -> Self {
1454        char::from(u8::from(code)).to_string()
1455    }
1456}
1457
1458impl<T> From<&TransactionStatus<T>> for TransactionCode {
1459    /// Convert from the Session's version
1460    fn from(status: &TransactionStatus<T>) -> TransactionCode {
1461        match status {
1462            TransactionStatus::Default => TransactionCode::Idle,
1463            TransactionStatus::Started(_) => TransactionCode::InTransaction,
1464            TransactionStatus::InTransaction(_) => TransactionCode::InTransaction,
1465            TransactionStatus::InTransactionImplicit(_) => TransactionCode::InTransaction,
1466            TransactionStatus::Failed(_) => TransactionCode::Failed,
1467        }
1468    }
1469}
1470
1471/// The type of operation being performed by the transaction.
1472///
1473/// This is needed because we currently do not allow mixing reads and writes in
1474/// a transaction. Use this to record what we have done, and what may need to
1475/// happen at commit.
1476#[derive(Debug)]
1477pub enum TransactionOps<T> {
1478    /// The transaction has been initiated, but no statement has yet been executed
1479    /// in it.
1480    None,
1481    /// This transaction has had a peek (`SELECT`, `SUBSCRIBE`). If the inner value
1482    /// is has a timestamp, it must only do other peeks. However, if it doesn't
1483    /// have a timestamp (i.e. the values are constants), the transaction can still
1484    /// perform writes.
1485    Peeks {
1486        /// The timestamp and timestamp related metadata for the peek.
1487        determination: TimestampDetermination<T>,
1488        /// The cluster used to execute peeks.
1489        cluster_id: ClusterId,
1490        /// Whether this peek needs to be linearized.
1491        requires_linearization: RequireLinearization,
1492    },
1493    /// This transaction has done a `SUBSCRIBE` and must do nothing else.
1494    Subscribe,
1495    /// This transaction has had a write (`INSERT`, `UPDATE`, `DELETE`) and must
1496    /// only do other writes, or reads whose timestamp is None (i.e. constants).
1497    Writes(Vec<WriteOp>),
1498    /// This transaction has a prospective statement that will execute during commit.
1499    SingleStatement {
1500        /// The prospective statement.
1501        stmt: Arc<Statement<Raw>>,
1502        /// The statement params.
1503        params: mz_sql::plan::Params,
1504    },
1505    /// This transaction has run some _simple_ DDL and must do nothing else. Any statement/plan that
1506    /// uses this must return false in `must_serialize_ddl()` because this is serialized instead in
1507    /// `sequence_plan()` during `COMMIT`.
1508    DDL {
1509        /// Catalog operations that have already run, and must run before each subsequent op.
1510        ops: Vec<crate::catalog::Op>,
1511        /// In-memory state that reflects the previously applied ops.
1512        state: CatalogState,
1513        /// Transient revision of the `Catalog` when this transaction started.
1514        revision: u64,
1515    },
1516}
1517
1518impl<T> TransactionOps<T> {
1519    fn timestamp_determination(self) -> Option<TimestampDetermination<T>> {
1520        match self {
1521            TransactionOps::Peeks { determination, .. } => Some(determination),
1522            TransactionOps::None
1523            | TransactionOps::Subscribe
1524            | TransactionOps::Writes(_)
1525            | TransactionOps::SingleStatement { .. }
1526            | TransactionOps::DDL { .. } => None,
1527        }
1528    }
1529}
1530
1531impl<T> Default for TransactionOps<T> {
1532    fn default() -> Self {
1533        Self::None
1534    }
1535}
1536
1537/// An `INSERT` waiting to be committed.
1538#[derive(Debug, Clone, PartialEq)]
1539pub struct WriteOp {
1540    /// The target table.
1541    pub id: CatalogItemId,
1542    /// The data rows.
1543    pub rows: TableData,
1544}
1545
1546/// Whether a transaction requires linearization.
1547#[derive(Debug)]
1548pub enum RequireLinearization {
1549    /// Linearization is required.
1550    Required,
1551    /// Linearization is not required.
1552    NotRequired,
1553}
1554
1555impl From<&ExplainContext> for RequireLinearization {
1556    fn from(ctx: &ExplainContext) -> Self {
1557        match ctx {
1558            ExplainContext::None | ExplainContext::PlanInsightsNotice(_) => {
1559                RequireLinearization::Required
1560            }
1561            _ => RequireLinearization::NotRequired,
1562        }
1563    }
1564}
1565
1566/// A complete set of exclusive locks for writing to collections identified by [`CatalogItemId`]s.
1567///
1568/// To prevent deadlocks between two sessions, we do not allow acquiring a partial set of locks.
1569#[derive(Debug)]
1570pub struct WriteLocks {
1571    locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1572    /// Connection that currently holds these locks, used for tracing purposes only.
1573    conn_id: ConnectionId,
1574}
1575
1576impl WriteLocks {
1577    /// Create a [`WriteLocksBuilder`] pre-defining all of the locks we need.
1578    ///
1579    /// When "finishing" the builder with [`WriteLocksBuilder::all_or_nothing`], if we haven't
1580    /// acquired all of the necessary locks we drop any partially acquired ones.
1581    pub fn builder(sources: impl IntoIterator<Item = CatalogItemId>) -> WriteLocksBuilder {
1582        let locks = sources.into_iter().map(|gid| (gid, None)).collect();
1583        WriteLocksBuilder { locks }
1584    }
1585
1586    /// Validate this set of [`WriteLocks`] is sufficient for the provided collections.
1587    /// Dropping the currently held locks if it's not.
1588    pub fn validate(
1589        self,
1590        collections: impl Iterator<Item = CatalogItemId>,
1591    ) -> Result<Self, BTreeSet<CatalogItemId>> {
1592        let mut missing = BTreeSet::new();
1593        for collection in collections {
1594            if !self.locks.contains_key(&collection) {
1595                missing.insert(collection);
1596            }
1597        }
1598
1599        if missing.is_empty() {
1600            Ok(self)
1601        } else {
1602            // Explicitly drop the already acquired locks.
1603            drop(self);
1604            Err(missing)
1605        }
1606    }
1607}
1608
1609impl Drop for WriteLocks {
1610    fn drop(&mut self) {
1611        // We may have merged the locks into GroupCommitWriteLocks, thus it could be empty.
1612        if !self.locks.is_empty() {
1613            tracing::info!(
1614                conn_id = %self.conn_id,
1615                locks = ?self.locks,
1616                "dropping write locks",
1617            );
1618        }
1619    }
1620}
1621
1622/// A builder struct that helps us acquire all of the locks we need, or none of them.
1623///
1624/// See [`WriteLocks::builder`].
1625#[derive(Debug)]
1626pub struct WriteLocksBuilder {
1627    locks: BTreeMap<CatalogItemId, Option<tokio::sync::OwnedMutexGuard<()>>>,
1628}
1629
1630impl WriteLocksBuilder {
1631    /// Adds a lock to this builder.
1632    pub fn insert_lock(&mut self, id: CatalogItemId, lock: tokio::sync::OwnedMutexGuard<()>) {
1633        self.locks.insert(id, Some(lock));
1634    }
1635
1636    /// Finish this builder by returning either all of the necessary locks, or none of them.
1637    ///
1638    /// If we fail to acquire all of the locks, returns one of the [`CatalogItemId`]s that we
1639    /// failed to acquire a lock for, that should be awaited so we know when to run again.
1640    pub fn all_or_nothing(self, conn_id: &ConnectionId) -> Result<WriteLocks, CatalogItemId> {
1641        let (locks, missing): (BTreeMap<_, _>, BTreeSet<_>) =
1642            self.locks
1643                .into_iter()
1644                .partition_map(|(gid, lock)| match lock {
1645                    Some(lock) => itertools::Either::Left((gid, lock)),
1646                    None => itertools::Either::Right(gid),
1647                });
1648
1649        match missing.iter().next() {
1650            None => {
1651                tracing::info!(%conn_id, ?locks, "acquired write locks");
1652                Ok(WriteLocks {
1653                    locks,
1654                    conn_id: conn_id.clone(),
1655                })
1656            }
1657            Some(gid) => {
1658                tracing::info!(?missing, "failed to acquire write locks");
1659                // Explicitly drop the already acquired locks.
1660                drop(locks);
1661                Err(*gid)
1662            }
1663        }
1664    }
1665}
1666
1667/// Collection of [`WriteLocks`] gathered during [`group_commit`].
1668///
1669/// Note: This struct should __never__ be used outside of group commit because it attempts to merge
1670/// together several collections of [`WriteLocks`] which if not done carefully can cause deadlocks
1671/// or consistency violations.
1672///
1673/// We must prevent writes from occurring to tables during read then write plans (e.g. `UPDATE`)
1674/// but we can allow blind writes (e.g. `INSERT`) to get committed concurrently at the same
1675/// timestamp when submitting the updates from a read then write plan.
1676///
1677/// Naively it would seem as though we could allow blind writes to occur whenever as blind writes
1678/// could never cause invalid retractions, but it could cause us to violate serializability because
1679/// there is no total order we could define for the transactions. Consider the following scenario:
1680///
1681/// ```text
1682/// table: foo
1683///
1684///  a | b
1685/// --------
1686///  x   2
1687///  y   3
1688///  z   4
1689///
1690/// -- Session(A)
1691/// -- read then write plan, reads at t0, writes at t3, transaction Ta
1692/// DELETE FROM foo WHERE b % 2 = 0;
1693///
1694///
1695/// -- Session(B)
1696/// -- blind write into foo, writes at t1, transaction Tb
1697/// INSERT INTO foo VALUES ('q', 6);
1698/// -- select from foo, reads at t2, transaction Tc
1699/// SELECT * FROM foo;
1700///
1701///
1702/// The times these operations occur at are ordered:
1703/// t0 < t1 < t2 < t3
1704///
1705/// Given the timing of the operations, the transactions must have the following order:
1706///
1707/// * Ta does not observe ('q', 6), so Ta < Tb
1708/// * Tc does observe ('q', 6), so Tb < Tc
1709/// * Tc does not observe the retractions from Ta, so Tc < Ta
1710///
1711/// For total order to exist, Ta < Tb < Tc < Ta, which is impossible.
1712/// ```
1713///
1714/// [`group_commit`]: super::coord::Coordinator::group_commit
1715#[derive(Debug, Default)]
1716pub(crate) struct GroupCommitWriteLocks {
1717    locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1718}
1719
1720impl GroupCommitWriteLocks {
1721    /// Merge a set of [`WriteLocks`] into this collection for group commit.
1722    pub fn merge(&mut self, mut locks: WriteLocks) {
1723        // Note: Ideally we would use `.drain`, but that method doesn't exist for BTreeMap.
1724        //
1725        // See: <https://github.com/rust-lang/rust/issues/81074>
1726        let existing = std::mem::take(&mut locks.locks);
1727        self.locks.extend(existing);
1728    }
1729
1730    /// Returns the collections we're missing locks for, if any.
1731    pub fn missing_locks(
1732        &self,
1733        writes: impl Iterator<Item = CatalogItemId>,
1734    ) -> BTreeSet<CatalogItemId> {
1735        let mut missing = BTreeSet::new();
1736        for write in writes {
1737            if !self.locks.contains_key(&write) {
1738                missing.insert(write);
1739            }
1740        }
1741        missing
1742    }
1743}
1744
1745impl Drop for GroupCommitWriteLocks {
1746    fn drop(&mut self) {
1747        if !self.locks.is_empty() {
1748            tracing::info!(
1749                locks = ?self.locks,
1750                "dropping group commit write locks",
1751            );
1752        }
1753    }
1754}