1#![warn(missing_docs)]
13
14use std::collections::btree_map::Entry;
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Debug;
17use std::future::Future;
18use std::mem;
19use std::net::IpAddr;
20use std::pin::Pin;
21use std::sync::Arc;
22
23use chrono::{DateTime, Utc};
24use derivative::Derivative;
25use itertools::Itertools;
26use mz_adapter_types::connection::ConnectionId;
27use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
28use mz_controller_types::ClusterId;
29use mz_ore::metrics::{MetricsFutureExt, MetricsRegistry};
30use mz_ore::now::{EpochMillis, NowFn};
31use mz_pgwire_common::Format;
32use mz_repr::role_id::RoleId;
33use mz_repr::user::{ExternalUserMetadata, InternalUserMetadata};
34use mz_repr::{CatalogItemId, Datum, Row, RowIterator, SqlScalarType, TimestampManipulation};
35use mz_sql::ast::{AstInfo, Raw, Statement, TransactionAccessMode};
36use mz_sql::plan::{Params, PlanContext, QueryWhen, StatementDesc};
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::user::{
39 INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER, RoleMetadata, SYSTEM_USER, User,
40};
41use mz_sql::session::vars::IsolationLevel;
42pub use mz_sql::session::vars::{
43 DEFAULT_DATABASE_NAME, EndTransactionAction, SERVER_MAJOR_VERSION, SERVER_MINOR_VERSION,
44 SERVER_PATCH_VERSION, SessionVars, Var,
45};
46use mz_sql_parser::ast::TransactionIsolationLevel;
47use mz_storage_client::client::TableData;
48use mz_storage_types::sources::Timeline;
49use qcell::{QCell, QCellOwner};
50use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
51use tokio::sync::watch;
52use uuid::Uuid;
53
54use crate::catalog::CatalogState;
55use crate::client::RecordFirstRowStream;
56use crate::coord::appends::BuiltinTableAppendNotify;
57use crate::coord::in_memory_oracle::InMemoryTimestampOracle;
58use crate::coord::peek::PeekResponseUnary;
59use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
60use crate::coord::{Coordinator, ExplainContext};
61use crate::error::AdapterError;
62use crate::metrics::{Metrics, SessionMetrics};
63use crate::statement_logging::PreparedStatementLoggingInfo;
64use crate::{AdapterNotice, ExecuteContext};
65
66const DUMMY_CONNECTION_ID: ConnectionId = ConnectionId::Static(0);
67
68#[derive(Derivative)]
70#[derivative(Debug)]
71pub struct Session<T = mz_repr::Timestamp>
72where
73 T: Debug + Clone + Send + Sync,
74{
75 conn_id: ConnectionId,
76 uuid: Uuid,
79 prepared_statements: BTreeMap<String, PreparedStatement>,
80 portals: BTreeMap<String, Portal>,
81 transaction: TransactionStatus<T>,
82 pcx: Option<PlanContext>,
83 metrics: SessionMetrics,
84 #[derivative(Debug = "ignore")]
85 builtin_updates: Option<BuiltinTableAppendNotify>,
86
87 role_metadata: Option<RoleMetadata>,
100 client_ip: Option<IpAddr>,
101 vars: SessionVars,
102 notices_tx: mpsc::UnboundedSender<AdapterNotice>,
103 notices_rx: mpsc::UnboundedReceiver<AdapterNotice>,
104 next_transaction_id: TransactionId,
105 secret_key: u32,
106 external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
107 #[derivative(Debug = "ignore")]
119 qcell_owner: QCellOwner,
120 session_oracles: BTreeMap<Timeline, InMemoryTimestampOracle<T, NowFn<T>>>,
121 state_revision: u64,
127}
128
129impl<T> SessionMetadata for Session<T>
130where
131 T: Debug + Clone + Send + Sync,
132 T: TimestampManipulation,
133{
134 fn conn_id(&self) -> &ConnectionId {
135 &self.conn_id
136 }
137
138 fn client_ip(&self) -> Option<&IpAddr> {
139 self.client_ip.as_ref()
140 }
141
142 fn pcx(&self) -> &PlanContext {
143 &self
144 .transaction()
145 .inner()
146 .expect("no active transaction")
147 .pcx
148 }
149
150 fn role_metadata(&self) -> &RoleMetadata {
151 self.role_metadata
152 .as_ref()
153 .expect("role_metadata invariant violated")
154 }
155
156 fn vars(&self) -> &SessionVars {
157 &self.vars
158 }
159}
160
161#[derive(Debug)]
164pub struct SessionMeta {
165 conn_id: ConnectionId,
166 client_ip: Option<IpAddr>,
167 pcx: PlanContext,
168 role_metadata: RoleMetadata,
169 vars: SessionVars,
170}
171
172impl SessionMetadata for SessionMeta {
173 fn vars(&self) -> &SessionVars {
174 &self.vars
175 }
176
177 fn conn_id(&self) -> &ConnectionId {
178 &self.conn_id
179 }
180
181 fn client_ip(&self) -> Option<&IpAddr> {
182 self.client_ip.as_ref()
183 }
184
185 fn pcx(&self) -> &PlanContext {
186 &self.pcx
187 }
188
189 fn role_metadata(&self) -> &RoleMetadata {
190 &self.role_metadata
191 }
192}
193
194#[derive(Debug, Clone)]
196pub struct SessionConfig {
197 pub conn_id: ConnectionId,
201 pub uuid: Uuid,
206 pub client_ip: Option<IpAddr>,
208 pub user: String,
210 pub external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
213 pub helm_chart_version: Option<String>,
215}
216
217impl<T: TimestampManipulation> Session<T> {
218 pub(crate) fn new(
220 build_info: &'static BuildInfo,
221 config: SessionConfig,
222 metrics: SessionMetrics,
223 ) -> Session<T> {
224 assert_ne!(config.conn_id, DUMMY_CONNECTION_ID);
225 Self::new_internal(build_info, config, metrics)
226 }
227
228 pub fn meta(&self) -> SessionMeta {
231 SessionMeta {
232 conn_id: self.conn_id().clone(),
233 client_ip: self.client_ip().copied(),
234 pcx: self.pcx().clone(),
235 role_metadata: self.role_metadata().clone(),
236 vars: self.vars.clone(),
237 }
238
239 }
241
242 pub(crate) fn mint_logging<A: AstInfo>(
252 &self,
253 raw_sql: String,
254 stmt: Option<&Statement<A>>,
255 now: EpochMillis,
256 ) -> Arc<QCell<PreparedStatementLoggingInfo>> {
257 Arc::new(QCell::new(
258 &self.qcell_owner,
259 PreparedStatementLoggingInfo::still_to_log(
260 raw_sql,
261 stmt,
262 now,
263 "".to_string(),
264 self.uuid,
265 false,
266 ),
267 ))
268 }
269
270 pub(crate) fn qcell_ro<'a, T2: 'a>(&'a self, cell: &'a Arc<QCell<T2>>) -> &'a T2 {
271 self.qcell_owner.ro(&*cell)
272 }
273
274 pub(crate) fn qcell_rw<'a, T2: 'a>(&'a mut self, cell: &'a Arc<QCell<T2>>) -> &'a mut T2 {
275 self.qcell_owner.rw(&*cell)
276 }
277
278 pub fn uuid(&self) -> Uuid {
281 self.uuid
282 }
283
284 pub fn dummy() -> Session<T> {
289 let registry = MetricsRegistry::new();
290 let metrics = Metrics::register_into(®istry);
291 let metrics = metrics.session_metrics();
292 let mut dummy = Self::new_internal(
293 &DUMMY_BUILD_INFO,
294 SessionConfig {
295 conn_id: DUMMY_CONNECTION_ID,
296 uuid: Uuid::new_v4(),
297 user: SYSTEM_USER.name.clone(),
298 client_ip: None,
299 external_metadata_rx: None,
300 helm_chart_version: None,
301 },
302 metrics,
303 );
304 dummy.initialize_role_metadata(RoleId::User(0));
305 dummy
306 }
307
308 fn new_internal(
309 build_info: &'static BuildInfo,
310 SessionConfig {
311 conn_id,
312 uuid,
313 user,
314 client_ip,
315 mut external_metadata_rx,
316 helm_chart_version,
317 }: SessionConfig,
318 metrics: SessionMetrics,
319 ) -> Session<T> {
320 let (notices_tx, notices_rx) = mpsc::unbounded_channel();
321 let default_cluster = INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER.get(&user);
322 let user = User {
323 name: user,
324 internal_metadata: None,
325 external_metadata: external_metadata_rx
326 .as_mut()
327 .map(|rx| rx.borrow_and_update().clone()),
328 };
329 let mut vars = SessionVars::new_unchecked(build_info, user, helm_chart_version);
330 if let Some(default_cluster) = default_cluster {
331 vars.set_cluster(default_cluster.clone());
332 }
333 Session {
334 conn_id,
335 uuid,
336 transaction: TransactionStatus::Default,
337 pcx: None,
338 metrics,
339 builtin_updates: None,
340 prepared_statements: BTreeMap::new(),
341 portals: BTreeMap::new(),
342 role_metadata: None,
343 client_ip,
344 vars,
345 notices_tx,
346 notices_rx,
347 next_transaction_id: 0,
348 secret_key: rand::random(),
349 external_metadata_rx,
350 qcell_owner: QCellOwner::new(),
351 session_oracles: BTreeMap::new(),
352 state_revision: 0,
353 }
354 }
355
356 pub fn secret_key(&self) -> u32 {
358 self.secret_key
359 }
360
361 fn new_pcx(&self, mut wall_time: DateTime<Utc>) -> PlanContext {
362 if let Some(mock_time) = self.vars().unsafe_new_transaction_wall_time() {
363 wall_time = *mock_time;
364 }
365 PlanContext::new(wall_time)
366 }
367
368 pub fn start_transaction(
371 &mut self,
372 wall_time: DateTime<Utc>,
373 access: Option<TransactionAccessMode>,
374 isolation_level: Option<TransactionIsolationLevel>,
375 ) -> Result<(), AdapterError> {
376 if let Some(txn) = self.transaction.inner() {
378 let read_write_prohibited = match txn.ops {
382 TransactionOps::Peeks { .. } | TransactionOps::Subscribe => {
383 txn.access == Some(TransactionAccessMode::ReadOnly)
384 }
385 TransactionOps::None
386 | TransactionOps::Writes(_)
387 | TransactionOps::SingleStatement { .. }
388 | TransactionOps::DDL { .. } => false,
389 };
390
391 if read_write_prohibited && access == Some(TransactionAccessMode::ReadWrite) {
392 return Err(AdapterError::ReadWriteUnavailable);
393 }
394 }
395
396 match std::mem::take(&mut self.transaction) {
397 TransactionStatus::Default => {
398 let id = self.next_transaction_id;
399 self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
400 self.transaction = TransactionStatus::InTransaction(Transaction {
401 pcx: self.new_pcx(wall_time),
402 ops: TransactionOps::None,
403 write_lock_guards: None,
404 access,
405 id,
406 });
407 }
408 TransactionStatus::Started(mut txn)
409 | TransactionStatus::InTransactionImplicit(mut txn)
410 | TransactionStatus::InTransaction(mut txn) => {
411 if access.is_some() {
412 txn.access = access;
413 }
414 self.transaction = TransactionStatus::InTransaction(txn);
415 }
416 TransactionStatus::Failed(_) => unreachable!(),
417 };
418
419 if let Some(isolation_level) = isolation_level {
420 self.vars
421 .set_local_transaction_isolation(isolation_level.into());
422 }
423
424 Ok(())
425 }
426
427 pub fn start_transaction_implicit(&mut self, wall_time: DateTime<Utc>, stmts: usize) {
430 if let TransactionStatus::Default = self.transaction {
431 let id = self.next_transaction_id;
432 self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
433 let txn = Transaction {
434 pcx: self.new_pcx(wall_time),
435 ops: TransactionOps::None,
436 write_lock_guards: None,
437 access: None,
438 id,
439 };
440 match stmts {
441 1 => self.transaction = TransactionStatus::Started(txn),
442 n if n > 1 => self.transaction = TransactionStatus::InTransactionImplicit(txn),
443 _ => {}
444 }
445 }
446 }
447
448 pub fn start_transaction_single_stmt(&mut self, wall_time: DateTime<Utc>) {
450 self.start_transaction_implicit(wall_time, 1);
451 }
452
453 #[must_use]
463 pub fn clear_transaction(&mut self) -> TransactionStatus<T> {
464 self.portals.clear();
465 self.pcx = None;
466 self.state_revision += 1;
467 mem::take(&mut self.transaction)
468 }
469
470 pub fn fail_transaction(mut self) -> Self {
472 match self.transaction {
473 TransactionStatus::Default => unreachable!(),
474 TransactionStatus::Started(txn)
475 | TransactionStatus::InTransactionImplicit(txn)
476 | TransactionStatus::InTransaction(txn) => {
477 self.transaction = TransactionStatus::Failed(txn);
478 }
479 TransactionStatus::Failed(_) => {}
480 };
481 self
482 }
483
484 pub fn transaction(&self) -> &TransactionStatus<T> {
486 &self.transaction
487 }
488
489 pub fn transaction_mut(&mut self) -> &mut TransactionStatus<T> {
491 &mut self.transaction
492 }
493
494 pub fn transaction_code(&self) -> TransactionCode {
496 self.transaction().into()
497 }
498
499 pub fn add_transaction_ops(&mut self, add_ops: TransactionOps<T>) -> Result<(), AdapterError> {
503 self.transaction.add_ops(add_ops)
504 }
505
506 pub fn retain_notice_transmitter(&self) -> UnboundedSender<AdapterNotice> {
508 self.notices_tx.clone()
509 }
510
511 pub fn add_notice(&self, notice: AdapterNotice) {
513 self.add_notices([notice])
514 }
515
516 pub fn add_notices(&self, notices: impl IntoIterator<Item = AdapterNotice>) {
518 for notice in notices {
519 let _ = self.notices_tx.send(notice);
520 }
521 }
522
523 pub async fn recv_notice(&mut self) -> AdapterNotice {
527 loop {
529 let notice = self
530 .notices_rx
531 .recv()
532 .await
533 .expect("Session also holds a sender, so recv won't ever return None");
534 match self.notice_filter(notice) {
535 Some(notice) => return notice,
536 None => continue,
537 }
538 }
539 }
540
541 pub fn drain_notices(&mut self) -> Vec<AdapterNotice> {
543 let mut notices = Vec::new();
544 while let Ok(notice) = self.notices_rx.try_recv() {
545 if let Some(notice) = self.notice_filter(notice) {
546 notices.push(notice);
547 }
548 }
549 notices
550 }
551
552 fn notice_filter(&self, notice: AdapterNotice) -> Option<AdapterNotice> {
554 let minimum_client_severity = self.vars.client_min_messages();
556 let sev = notice.severity();
557 if !minimum_client_severity.should_output_to_client(&sev) {
558 return None;
559 }
560 if let AdapterNotice::ClusterReplicaStatusChanged { cluster, .. } = ¬ice {
562 if cluster != self.vars.cluster() {
563 return None;
564 }
565 }
566 Some(notice)
567 }
568
569 pub fn clear_transaction_ops(&mut self) {
572 if let Some(txn) = self.transaction.inner_mut() {
573 txn.ops = TransactionOps::None;
574 }
575 }
576
577 pub fn take_transaction_timestamp_context(&mut self) -> Option<TimestampContext<T>> {
582 if let Some(Transaction { ops, .. }) = self.transaction.inner_mut() {
583 if let TransactionOps::Peeks { .. } = ops {
584 let ops = std::mem::take(ops);
585 Some(
586 ops.timestamp_determination()
587 .expect("checked above")
588 .timestamp_context,
589 )
590 } else {
591 None
592 }
593 } else {
594 None
595 }
596 }
597
598 pub fn get_transaction_timestamp_determination(&self) -> Option<TimestampDetermination<T>> {
603 match self.transaction.inner() {
604 Some(Transaction {
605 pcx: _,
606 ops: TransactionOps::Peeks { determination, .. },
607 write_lock_guards: _,
608 access: _,
609 id: _,
610 }) => Some(determination.clone()),
611 _ => None,
612 }
613 }
614
615 pub fn contains_read_timestamp(&self) -> bool {
617 matches!(
618 self.transaction.inner(),
619 Some(Transaction {
620 pcx: _,
621 ops: TransactionOps::Peeks {
622 determination: TimestampDetermination {
623 timestamp_context: TimestampContext::TimelineTimestamp { .. },
624 ..
625 },
626 ..
627 },
628 write_lock_guards: _,
629 access: _,
630 id: _,
631 })
632 )
633 }
634
635 pub fn set_prepared_statement(
637 &mut self,
638 name: String,
639 stmt: Option<Statement<Raw>>,
640 raw_sql: String,
641 desc: StatementDesc,
642 state_revision: StateRevision,
643 now: EpochMillis,
644 ) {
645 let logging = PreparedStatementLoggingInfo::still_to_log(
646 raw_sql,
647 stmt.as_ref(),
648 now,
649 name.clone(),
650 self.uuid,
651 false,
652 );
653 let statement = PreparedStatement {
654 stmt,
655 desc,
656 state_revision,
657 logging: Arc::new(QCell::new(&self.qcell_owner, logging)),
658 };
659 self.prepared_statements.insert(name, statement);
660 }
661
662 pub fn remove_prepared_statement(&mut self, name: &str) -> bool {
666 self.prepared_statements.remove(name).is_some()
667 }
668
669 pub fn remove_all_prepared_statements(&mut self) {
671 self.prepared_statements.clear();
672 }
673
674 pub fn get_prepared_statement_unverified(&self, name: &str) -> Option<&PreparedStatement> {
679 self.prepared_statements.get(name)
680 }
681
682 pub fn get_prepared_statement_mut_unverified(
687 &mut self,
688 name: &str,
689 ) -> Option<&mut PreparedStatement> {
690 self.prepared_statements.get_mut(name)
691 }
692
693 pub fn prepared_statements(&self) -> &BTreeMap<String, PreparedStatement> {
695 &self.prepared_statements
696 }
697
698 pub fn portals(&self) -> &BTreeMap<String, Portal> {
700 &self.portals
701 }
702
703 pub fn set_portal(
713 &mut self,
714 portal_name: String,
715 desc: StatementDesc,
716 stmt: Option<Statement<Raw>>,
717 logging: Arc<QCell<PreparedStatementLoggingInfo>>,
718 params: Vec<(Datum, SqlScalarType)>,
719 result_formats: Vec<Format>,
720 state_revision: StateRevision,
721 ) -> Result<(), AdapterError> {
722 if !portal_name.is_empty() && self.portals.contains_key(&portal_name) {
724 return Err(AdapterError::DuplicateCursor(portal_name));
725 }
726 self.state_revision += 1;
727 let param_types = desc.param_types.clone();
728 self.portals.insert(
729 portal_name,
730 Portal {
731 stmt: stmt.map(Arc::new),
732 desc,
733 state_revision,
734 parameters: Params {
735 datums: Row::pack(params.iter().map(|(d, _t)| d)),
736 execute_types: params.into_iter().map(|(_d, t)| t).collect(),
737 expected_types: param_types,
738 },
739 result_formats,
740 state: PortalState::NotStarted,
741 logging,
742 lifecycle_timestamps: None,
743 },
744 );
745 Ok(())
746 }
747
748 pub fn remove_portal(&mut self, portal_name: &str) -> bool {
752 self.state_revision += 1;
753 self.portals.remove(portal_name).is_some()
754 }
755
756 pub fn get_portal_unverified(&self, portal_name: &str) -> Option<&Portal> {
760 self.portals.get(portal_name)
761 }
762
763 pub fn get_portal_unverified_mut(&mut self, portal_name: &str) -> Option<PortalRefMut<'_>> {
770 self.portals.get_mut(portal_name).map(|p| PortalRefMut {
771 stmt: &p.stmt,
772 desc: &p.desc,
773 state_revision: &mut p.state_revision,
774 parameters: &mut p.parameters,
775 result_formats: &mut p.result_formats,
776 logging: &mut p.logging,
777 state: &mut p.state,
778 lifecycle_timestamps: &mut p.lifecycle_timestamps,
779 })
780 }
781
782 pub fn create_new_portal(
784 &mut self,
785 stmt: Option<Statement<Raw>>,
786 logging: Arc<QCell<PreparedStatementLoggingInfo>>,
787 desc: StatementDesc,
788 parameters: Params,
789 result_formats: Vec<Format>,
790 state_revision: StateRevision,
791 ) -> Result<String, AdapterError> {
792 self.state_revision += 1;
793
794 for i in 0usize.. {
796 let name = format!("<unnamed portal {}>", i);
797 match self.portals.entry(name.clone()) {
798 Entry::Occupied(_) => continue,
799 Entry::Vacant(entry) => {
800 entry.insert(Portal {
801 stmt: stmt.map(Arc::new),
802 desc,
803 state_revision,
804 parameters,
805 result_formats,
806 state: PortalState::NotStarted,
807 logging,
808 lifecycle_timestamps: None,
809 });
810 return Ok(name);
811 }
812 }
813 }
814
815 coord_bail!("unable to create a new portal");
816 }
817
818 pub fn reset(&mut self) {
821 let _ = self.clear_transaction();
822 self.prepared_statements.clear();
823 self.vars.reset_all();
824 }
825
826 pub fn application_name(&self) -> &str {
830 self.vars.application_name()
831 }
832
833 pub fn vars(&self) -> &SessionVars {
835 &self.vars
836 }
837
838 pub fn vars_mut(&mut self) -> &mut SessionVars {
840 &mut self.vars
841 }
842
843 pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
849 self.transaction.try_grant_write_locks(guards)
850 }
851
852 pub fn apply_external_metadata_updates(&mut self) {
854 let Some(rx) = &mut self.external_metadata_rx else {
856 return;
857 };
858
859 if !rx.has_changed().unwrap_or(false) {
861 return;
862 }
863
864 let metadata = rx.borrow_and_update().clone();
867 self.vars.set_external_user_metadata(metadata);
868 }
869
870 pub fn apply_internal_user_metadata(&mut self, metadata: InternalUserMetadata) {
872 self.vars.set_internal_user_metadata(metadata);
873 }
874
875 pub fn initialize_role_metadata(&mut self, role_id: RoleId) {
877 self.role_metadata = Some(RoleMetadata::new(role_id));
878 }
879
880 pub fn ensure_timestamp_oracle(
883 &mut self,
884 timeline: Timeline,
885 ) -> &mut InMemoryTimestampOracle<T, NowFn<T>> {
886 self.session_oracles
887 .entry(timeline)
888 .or_insert_with(|| InMemoryTimestampOracle::new(T::minimum(), NowFn::from(T::minimum)))
889 }
890
891 pub fn ensure_local_timestamp_oracle(&mut self) -> &mut InMemoryTimestampOracle<T, NowFn<T>> {
894 self.ensure_timestamp_oracle(Timeline::EpochMilliseconds)
895 }
896
897 pub fn get_timestamp_oracle(
899 &self,
900 timeline: &Timeline,
901 ) -> Option<&InMemoryTimestampOracle<T, NowFn<T>>> {
902 self.session_oracles.get(timeline)
903 }
904
905 pub fn apply_write(&mut self, timestamp: T) {
908 if self.vars().transaction_isolation() == &IsolationLevel::StrongSessionSerializable {
909 self.ensure_local_timestamp_oracle().apply_write(timestamp);
910 }
911 }
912
913 pub fn metrics(&self) -> &SessionMetrics {
915 &self.metrics
916 }
917
918 pub fn set_builtin_table_updates(&mut self, fut: BuiltinTableAppendNotify) {
920 let prev = self.builtin_updates.replace(fut);
921 mz_ore::soft_assert_or_log!(prev.is_none(), "replacing old builtin table notify");
922 }
923
924 pub fn clear_builtin_table_updates(&mut self) -> Option<impl Future<Output = ()> + 'static> {
927 if let Some(fut) = self.builtin_updates.take() {
928 let histogram = self
930 .metrics()
931 .session_startup_table_writes_seconds()
932 .clone();
933 Some(async move {
934 fut.wall_time().observe(histogram).await;
935 })
936 } else {
937 None
938 }
939 }
940
941 pub fn state_revision(&self) -> u64 {
944 self.state_revision
945 }
946}
947
948#[derive(Derivative, Clone)]
950#[derivative(Debug)]
951pub struct PreparedStatement {
952 stmt: Option<Statement<Raw>>,
953 desc: StatementDesc,
954 pub state_revision: StateRevision,
956 #[derivative(Debug = "ignore")]
957 logging: Arc<QCell<PreparedStatementLoggingInfo>>,
958}
959
960impl PreparedStatement {
961 pub fn stmt(&self) -> Option<&Statement<Raw>> {
964 self.stmt.as_ref()
965 }
966
967 pub fn desc(&self) -> &StatementDesc {
969 &self.desc
970 }
971
972 pub fn logging(&self) -> &Arc<QCell<PreparedStatementLoggingInfo>> {
974 &self.logging
975 }
976}
977
978#[derive(Derivative)]
980#[derivative(Debug)]
981pub struct Portal {
982 pub stmt: Option<Arc<Statement<Raw>>>,
984 pub desc: StatementDesc,
986 pub state_revision: StateRevision,
988 pub parameters: Params,
990 pub result_formats: Vec<Format>,
992 #[derivative(Debug = "ignore")]
994 pub logging: Arc<QCell<PreparedStatementLoggingInfo>>,
995 #[derivative(Debug = "ignore")]
997 pub state: PortalState,
998 pub lifecycle_timestamps: Option<LifecycleTimestamps>,
1000}
1001
1002pub struct PortalRefMut<'a> {
1007 pub stmt: &'a Option<Arc<Statement<Raw>>>,
1009 pub desc: &'a StatementDesc,
1011 pub state_revision: &'a mut StateRevision,
1013 pub parameters: &'a mut Params,
1015 pub result_formats: &'a mut Vec<Format>,
1017 pub logging: &'a mut Arc<QCell<PreparedStatementLoggingInfo>>,
1019 pub state: &'a mut PortalState,
1021 pub lifecycle_timestamps: &'a mut Option<LifecycleTimestamps>,
1023}
1024
1025#[derive(Debug, Clone, Copy, PartialEq)]
1029pub struct StateRevision {
1030 pub catalog_revision: u64,
1032 pub session_state_revision: u64,
1034}
1035
1036pub enum PortalState {
1038 NotStarted,
1040 InProgress(Option<InProgressRows>),
1043 Completed(Option<String>),
1047}
1048
1049pub struct InProgressRows {
1051 pub current: Option<Box<dyn RowIterator + Send + Sync>>,
1053 pub remaining: RecordFirstRowStream,
1055}
1056
1057impl InProgressRows {
1058 pub fn new(remaining: RecordFirstRowStream) -> Self {
1060 Self {
1061 current: None,
1062 remaining,
1063 }
1064 }
1065
1066 pub fn no_more_rows(&self) -> bool {
1069 self.remaining.no_more_rows && self.current.is_none()
1070 }
1071}
1072
1073pub type RowBatchStream = UnboundedReceiver<PeekResponseUnary>;
1075
1076#[derive(Debug, Clone)]
1079pub struct LifecycleTimestamps {
1080 pub received: EpochMillis,
1085}
1086
1087impl LifecycleTimestamps {
1088 pub fn new(received: EpochMillis) -> Self {
1090 Self { received }
1091 }
1092}
1093
1094#[derive(Debug)]
1098pub enum TransactionStatus<T> {
1099 Default,
1101 Started(Transaction<T>),
1111 InTransaction(Transaction<T>),
1113 InTransactionImplicit(Transaction<T>),
1116 Failed(Transaction<T>),
1118}
1119
1120impl<T: TimestampManipulation> TransactionStatus<T> {
1121 pub fn into_ops_and_lock_guard(self) -> (Option<TransactionOps<T>>, Option<WriteLocks>) {
1123 match self {
1124 TransactionStatus::Default | TransactionStatus::Failed(_) => (None, None),
1125 TransactionStatus::Started(txn)
1126 | TransactionStatus::InTransaction(txn)
1127 | TransactionStatus::InTransactionImplicit(txn) => {
1128 (Some(txn.ops), txn.write_lock_guards)
1129 }
1130 }
1131 }
1132
1133 pub fn inner(&self) -> Option<&Transaction<T>> {
1135 match self {
1136 TransactionStatus::Default => None,
1137 TransactionStatus::Started(txn)
1138 | TransactionStatus::InTransaction(txn)
1139 | TransactionStatus::InTransactionImplicit(txn)
1140 | TransactionStatus::Failed(txn) => Some(txn),
1141 }
1142 }
1143
1144 pub fn inner_mut(&mut self) -> Option<&mut Transaction<T>> {
1146 match self {
1147 TransactionStatus::Default => None,
1148 TransactionStatus::Started(txn)
1149 | TransactionStatus::InTransaction(txn)
1150 | TransactionStatus::InTransactionImplicit(txn)
1151 | TransactionStatus::Failed(txn) => Some(txn),
1152 }
1153 }
1154
1155 pub fn is_ddl(&self) -> bool {
1157 match self {
1158 TransactionStatus::Default => false,
1159 TransactionStatus::Started(txn)
1160 | TransactionStatus::InTransaction(txn)
1161 | TransactionStatus::InTransactionImplicit(txn)
1162 | TransactionStatus::Failed(txn) => {
1163 matches!(txn.ops, TransactionOps::DDL { .. })
1164 }
1165 }
1166 }
1167
1168 pub fn is_implicit(&self) -> bool {
1171 match self {
1172 TransactionStatus::Started(_) | TransactionStatus::InTransactionImplicit(_) => true,
1173 TransactionStatus::Default
1174 | TransactionStatus::InTransaction(_)
1175 | TransactionStatus::Failed(_) => false,
1176 }
1177 }
1178
1179 pub fn is_in_multi_statement_transaction(&self) -> bool {
1181 match self {
1182 TransactionStatus::InTransaction(_) | TransactionStatus::InTransactionImplicit(_) => {
1183 true
1184 }
1185 TransactionStatus::Default
1186 | TransactionStatus::Started(_)
1187 | TransactionStatus::Failed(_) => false,
1188 }
1189 }
1190
1191 pub fn in_immediate_multi_stmt_txn(&self, when: &QueryWhen) -> bool {
1193 self.is_in_multi_statement_transaction() && when == &QueryWhen::Immediately
1194 }
1195
1196 pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1205 match self {
1206 TransactionStatus::Default => panic!("cannot grant write lock to txn not yet started"),
1207 TransactionStatus::Started(txn)
1208 | TransactionStatus::InTransaction(txn)
1209 | TransactionStatus::InTransactionImplicit(txn)
1210 | TransactionStatus::Failed(txn) => txn.try_grant_write_locks(guards),
1211 }
1212 }
1213
1214 pub fn write_locks(&self) -> Option<&WriteLocks> {
1216 match self {
1217 TransactionStatus::Default => None,
1218 TransactionStatus::Started(txn)
1219 | TransactionStatus::InTransaction(txn)
1220 | TransactionStatus::InTransactionImplicit(txn)
1221 | TransactionStatus::Failed(txn) => txn.write_lock_guards.as_ref(),
1222 }
1223 }
1224
1225 pub fn timeline(&self) -> Option<Timeline> {
1227 match self {
1228 TransactionStatus::Default => None,
1229 TransactionStatus::Started(txn)
1230 | TransactionStatus::InTransaction(txn)
1231 | TransactionStatus::InTransactionImplicit(txn)
1232 | TransactionStatus::Failed(txn) => txn.timeline(),
1233 }
1234 }
1235
1236 pub fn cluster(&self) -> Option<ClusterId> {
1238 match self {
1239 TransactionStatus::Default => None,
1240 TransactionStatus::Started(txn)
1241 | TransactionStatus::InTransaction(txn)
1242 | TransactionStatus::InTransactionImplicit(txn)
1243 | TransactionStatus::Failed(txn) => txn.cluster(),
1244 }
1245 }
1246
1247 pub fn catalog_state(&self) -> Option<&CatalogState> {
1249 match self.inner() {
1250 Some(Transaction {
1251 ops: TransactionOps::DDL { state, .. },
1252 ..
1253 }) => Some(state),
1254 _ => None,
1255 }
1256 }
1257
1258 pub fn contains_ops(&self) -> bool {
1260 match self.inner() {
1261 Some(txn) => txn.contains_ops(),
1262 None => false,
1263 }
1264 }
1265
1266 pub fn allows_writes(&self) -> bool {
1270 match self {
1271 TransactionStatus::Started(Transaction { ops, access, .. })
1272 | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1273 | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1274 match ops {
1275 TransactionOps::None => access != &Some(TransactionAccessMode::ReadOnly),
1276 TransactionOps::Peeks { determination, .. } => {
1277 !determination.timestamp_context.contains_timestamp()
1281 }
1282 TransactionOps::Subscribe => false,
1283 TransactionOps::Writes(_) => true,
1284 TransactionOps::SingleStatement { .. } => false,
1285 TransactionOps::DDL { .. } => false,
1286 }
1287 }
1288 TransactionStatus::Default | TransactionStatus::Failed(_) => {
1289 unreachable!()
1290 }
1291 }
1292 }
1293
1294 pub fn add_ops(&mut self, add_ops: TransactionOps<T>) -> Result<(), AdapterError> {
1307 match self {
1308 TransactionStatus::Started(Transaction { ops, access, .. })
1309 | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1310 | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1311 match ops {
1312 TransactionOps::None => {
1313 if matches!(access, Some(TransactionAccessMode::ReadOnly))
1314 && matches!(add_ops, TransactionOps::Writes(_))
1315 {
1316 return Err(AdapterError::ReadOnlyTransaction);
1317 }
1318 *ops = add_ops;
1319 }
1320 TransactionOps::Peeks {
1321 determination,
1322 cluster_id,
1323 requires_linearization,
1324 } => match add_ops {
1325 TransactionOps::Peeks {
1326 determination: add_timestamp_determination,
1327 cluster_id: add_cluster_id,
1328 requires_linearization: add_requires_linearization,
1329 } => {
1330 assert_eq!(*cluster_id, add_cluster_id);
1331 match (
1332 &determination.timestamp_context,
1333 &add_timestamp_determination.timestamp_context,
1334 ) {
1335 (
1336 TimestampContext::TimelineTimestamp {
1337 timeline: txn_timeline,
1338 chosen_ts: txn_ts,
1339 oracle_ts: _,
1340 },
1341 TimestampContext::TimelineTimestamp {
1342 timeline: add_timeline,
1343 chosen_ts: add_ts,
1344 oracle_ts: _,
1345 },
1346 ) => {
1347 assert_eq!(txn_timeline, add_timeline);
1348 assert_eq!(txn_ts, add_ts);
1349 }
1350 (TimestampContext::NoTimestamp, _) => {
1351 *determination = add_timestamp_determination
1352 }
1353 (_, TimestampContext::NoTimestamp) => {}
1354 };
1355 if matches!(requires_linearization, RequireLinearization::NotRequired)
1356 && matches!(
1357 add_requires_linearization,
1358 RequireLinearization::Required
1359 )
1360 {
1361 *requires_linearization = add_requires_linearization;
1362 }
1363 }
1364 writes @ TransactionOps::Writes(..)
1368 if !determination.timestamp_context.contains_timestamp() =>
1369 {
1370 *ops = writes;
1371 }
1372 _ => return Err(AdapterError::ReadOnlyTransaction),
1373 },
1374 TransactionOps::Subscribe => {
1375 return Err(AdapterError::SubscribeOnlyTransaction);
1376 }
1377 TransactionOps::Writes(txn_writes) => match add_ops {
1378 TransactionOps::Writes(mut add_writes) => {
1379 assert!(!matches!(access, Some(TransactionAccessMode::ReadOnly)));
1382 txn_writes.append(&mut add_writes);
1383 }
1384 TransactionOps::Peeks { determination, .. }
1387 if !determination.timestamp_context.contains_timestamp() => {}
1388 _ => {
1389 return Err(AdapterError::WriteOnlyTransaction);
1390 }
1391 },
1392 TransactionOps::SingleStatement { .. } => {
1393 return Err(AdapterError::SingleStatementTransaction);
1394 }
1395 TransactionOps::DDL {
1396 ops: og_ops,
1397 revision: og_revision,
1398 state: og_state,
1399 side_effects,
1400 } => match add_ops {
1401 TransactionOps::DDL {
1402 ops: new_ops,
1403 revision: new_revision,
1404 side_effects: mut net_new_side_effects,
1405 state: new_state,
1406 } => {
1407 if *og_revision != new_revision {
1408 return Err(AdapterError::DDLTransactionRace);
1409 }
1410 if !new_ops.is_empty() {
1412 *og_ops = new_ops;
1413 *og_state = new_state;
1414 }
1415 side_effects.append(&mut net_new_side_effects);
1416 }
1417 _ => return Err(AdapterError::DDLOnlyTransaction),
1418 },
1419 }
1420 }
1421 TransactionStatus::Default | TransactionStatus::Failed(_) => {
1422 unreachable!()
1423 }
1424 }
1425 Ok(())
1426 }
1427}
1428
1429pub type TransactionId = u64;
1431
1432impl<T> Default for TransactionStatus<T> {
1433 fn default() -> Self {
1434 TransactionStatus::Default
1435 }
1436}
1437
1438#[derive(Debug)]
1440pub struct Transaction<T> {
1441 pub pcx: PlanContext,
1443 pub ops: TransactionOps<T>,
1445 pub id: TransactionId,
1450 write_lock_guards: Option<WriteLocks>,
1452 access: Option<TransactionAccessMode>,
1454}
1455
1456impl<T> Transaction<T> {
1457 fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1460 match &mut self.write_lock_guards {
1461 Some(existing) => Err(existing),
1462 locks @ None => {
1463 *locks = Some(guards);
1464 Ok(())
1465 }
1466 }
1467 }
1468
1469 fn timeline(&self) -> Option<Timeline> {
1471 match &self.ops {
1472 TransactionOps::Peeks {
1473 determination:
1474 TimestampDetermination {
1475 timestamp_context: TimestampContext::TimelineTimestamp { timeline, .. },
1476 ..
1477 },
1478 ..
1479 } => Some(timeline.clone()),
1480 TransactionOps::Peeks { .. }
1481 | TransactionOps::None
1482 | TransactionOps::Subscribe
1483 | TransactionOps::Writes(_)
1484 | TransactionOps::SingleStatement { .. }
1485 | TransactionOps::DDL { .. } => None,
1486 }
1487 }
1488
1489 pub fn cluster(&self) -> Option<ClusterId> {
1491 match &self.ops {
1492 TransactionOps::Peeks { cluster_id, .. } => Some(cluster_id.clone()),
1493 TransactionOps::None
1494 | TransactionOps::Subscribe
1495 | TransactionOps::Writes(_)
1496 | TransactionOps::SingleStatement { .. }
1497 | TransactionOps::DDL { .. } => None,
1498 }
1499 }
1500
1501 fn contains_ops(&self) -> bool {
1503 !matches!(self.ops, TransactionOps::None)
1504 }
1505}
1506
1507#[derive(Debug, Clone, Copy)]
1509pub enum TransactionCode {
1510 Idle,
1512 InTransaction,
1514 Failed,
1516}
1517
1518impl From<TransactionCode> for u8 {
1519 fn from(code: TransactionCode) -> Self {
1520 match code {
1521 TransactionCode::Idle => b'I',
1522 TransactionCode::InTransaction => b'T',
1523 TransactionCode::Failed => b'E',
1524 }
1525 }
1526}
1527
1528impl From<TransactionCode> for String {
1529 fn from(code: TransactionCode) -> Self {
1530 char::from(u8::from(code)).to_string()
1531 }
1532}
1533
1534impl<T> From<&TransactionStatus<T>> for TransactionCode {
1535 fn from(status: &TransactionStatus<T>) -> TransactionCode {
1537 match status {
1538 TransactionStatus::Default => TransactionCode::Idle,
1539 TransactionStatus::Started(_) => TransactionCode::InTransaction,
1540 TransactionStatus::InTransaction(_) => TransactionCode::InTransaction,
1541 TransactionStatus::InTransactionImplicit(_) => TransactionCode::InTransaction,
1542 TransactionStatus::Failed(_) => TransactionCode::Failed,
1543 }
1544 }
1545}
1546
1547#[derive(Derivative)]
1553#[derivative(Debug)]
1554pub enum TransactionOps<T> {
1555 None,
1558 Peeks {
1563 determination: TimestampDetermination<T>,
1565 cluster_id: ClusterId,
1567 requires_linearization: RequireLinearization,
1569 },
1570 Subscribe,
1572 Writes(Vec<WriteOp>),
1575 SingleStatement {
1577 stmt: Arc<Statement<Raw>>,
1579 params: mz_sql::plan::Params,
1581 },
1582 DDL {
1586 ops: Vec<crate::catalog::Op>,
1588 state: CatalogState,
1590 #[derivative(Debug = "ignore")]
1592 side_effects: Vec<
1593 Box<
1594 dyn for<'a> FnOnce(
1595 &'a mut Coordinator,
1596 Option<&'a mut ExecuteContext>,
1597 ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
1598 + Send
1599 + Sync,
1600 >,
1601 >,
1602 revision: u64,
1604 },
1605}
1606
1607impl<T> TransactionOps<T> {
1608 fn timestamp_determination(self) -> Option<TimestampDetermination<T>> {
1609 match self {
1610 TransactionOps::Peeks { determination, .. } => Some(determination),
1611 TransactionOps::None
1612 | TransactionOps::Subscribe
1613 | TransactionOps::Writes(_)
1614 | TransactionOps::SingleStatement { .. }
1615 | TransactionOps::DDL { .. } => None,
1616 }
1617 }
1618}
1619
1620impl<T> Default for TransactionOps<T> {
1621 fn default() -> Self {
1622 Self::None
1623 }
1624}
1625
1626#[derive(Debug, Clone, PartialEq)]
1628pub struct WriteOp {
1629 pub id: CatalogItemId,
1631 pub rows: TableData,
1633}
1634
1635#[derive(Debug)]
1637pub enum RequireLinearization {
1638 Required,
1640 NotRequired,
1642}
1643
1644impl From<&ExplainContext> for RequireLinearization {
1645 fn from(ctx: &ExplainContext) -> Self {
1646 match ctx {
1647 ExplainContext::None | ExplainContext::PlanInsightsNotice(_) => {
1648 RequireLinearization::Required
1649 }
1650 _ => RequireLinearization::NotRequired,
1651 }
1652 }
1653}
1654
1655#[derive(Debug)]
1659pub struct WriteLocks {
1660 locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1661 conn_id: ConnectionId,
1663}
1664
1665impl WriteLocks {
1666 pub fn builder(sources: impl IntoIterator<Item = CatalogItemId>) -> WriteLocksBuilder {
1671 let locks = sources.into_iter().map(|gid| (gid, None)).collect();
1672 WriteLocksBuilder { locks }
1673 }
1674
1675 pub fn validate(
1678 self,
1679 collections: impl Iterator<Item = CatalogItemId>,
1680 ) -> Result<Self, BTreeSet<CatalogItemId>> {
1681 let mut missing = BTreeSet::new();
1682 for collection in collections {
1683 if !self.locks.contains_key(&collection) {
1684 missing.insert(collection);
1685 }
1686 }
1687
1688 if missing.is_empty() {
1689 Ok(self)
1690 } else {
1691 drop(self);
1693 Err(missing)
1694 }
1695 }
1696}
1697
1698impl Drop for WriteLocks {
1699 fn drop(&mut self) {
1700 if !self.locks.is_empty() {
1702 tracing::info!(
1703 conn_id = %self.conn_id,
1704 locks = ?self.locks,
1705 "dropping write locks",
1706 );
1707 }
1708 }
1709}
1710
1711#[derive(Debug)]
1715pub struct WriteLocksBuilder {
1716 locks: BTreeMap<CatalogItemId, Option<tokio::sync::OwnedMutexGuard<()>>>,
1717}
1718
1719impl WriteLocksBuilder {
1720 pub fn insert_lock(&mut self, id: CatalogItemId, lock: tokio::sync::OwnedMutexGuard<()>) {
1722 self.locks.insert(id, Some(lock));
1723 }
1724
1725 pub fn all_or_nothing(self, conn_id: &ConnectionId) -> Result<WriteLocks, CatalogItemId> {
1730 let (locks, missing): (BTreeMap<_, _>, BTreeSet<_>) =
1731 self.locks
1732 .into_iter()
1733 .partition_map(|(gid, lock)| match lock {
1734 Some(lock) => itertools::Either::Left((gid, lock)),
1735 None => itertools::Either::Right(gid),
1736 });
1737
1738 match missing.iter().next() {
1739 None => {
1740 tracing::info!(%conn_id, ?locks, "acquired write locks");
1741 Ok(WriteLocks {
1742 locks,
1743 conn_id: conn_id.clone(),
1744 })
1745 }
1746 Some(gid) => {
1747 tracing::info!(?missing, "failed to acquire write locks");
1748 drop(locks);
1750 Err(*gid)
1751 }
1752 }
1753 }
1754}
1755
1756#[derive(Debug, Default)]
1805pub(crate) struct GroupCommitWriteLocks {
1806 locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1807}
1808
1809impl GroupCommitWriteLocks {
1810 pub fn merge(&mut self, mut locks: WriteLocks) {
1812 let existing = std::mem::take(&mut locks.locks);
1816 self.locks.extend(existing);
1817 }
1818
1819 pub fn missing_locks(
1821 &self,
1822 writes: impl Iterator<Item = CatalogItemId>,
1823 ) -> BTreeSet<CatalogItemId> {
1824 let mut missing = BTreeSet::new();
1825 for write in writes {
1826 if !self.locks.contains_key(&write) {
1827 missing.insert(write);
1828 }
1829 }
1830 missing
1831 }
1832}
1833
1834impl Drop for GroupCommitWriteLocks {
1835 fn drop(&mut self) {
1836 if !self.locks.is_empty() {
1837 tracing::info!(
1838 locks = ?self.locks,
1839 "dropping group commit write locks",
1840 );
1841 }
1842 }
1843}