1#![warn(missing_docs)]
13
14use std::collections::btree_map::Entry;
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Debug;
17use std::future::Future;
18use std::mem;
19use std::net::IpAddr;
20use std::pin::Pin;
21use std::sync::Arc;
22
23use chrono::{DateTime, Utc};
24use derivative::Derivative;
25use itertools::Itertools;
26use mz_adapter_types::connection::ConnectionId;
27use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
28use mz_controller_types::ClusterId;
29use mz_ore::metrics::{MetricsFutureExt, MetricsRegistry};
30use mz_ore::now::{EpochMillis, NowFn};
31use mz_pgwire_common::Format;
32use mz_repr::role_id::RoleId;
33use mz_repr::user::{ExternalUserMetadata, InternalUserMetadata};
34use mz_repr::{CatalogItemId, Datum, Row, RowIterator, SqlScalarType, TimestampManipulation};
35use mz_sql::ast::{AstInfo, Raw, Statement, TransactionAccessMode};
36use mz_sql::plan::{Params, PlanContext, QueryWhen, StatementDesc};
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::user::{
39 INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER, RoleMetadata, SYSTEM_USER, User,
40};
41use mz_sql::session::vars::IsolationLevel;
42pub use mz_sql::session::vars::{
43 DEFAULT_DATABASE_NAME, EndTransactionAction, SERVER_MAJOR_VERSION, SERVER_MINOR_VERSION,
44 SERVER_PATCH_VERSION, SessionVars, Var,
45};
46use mz_sql_parser::ast::TransactionIsolationLevel;
47use mz_storage_client::client::TableData;
48use mz_storage_types::sources::Timeline;
49use qcell::{QCell, QCellOwner};
50use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
51use tokio::sync::watch;
52use uuid::Uuid;
53
54use crate::catalog::CatalogState;
55use crate::client::RecordFirstRowStream;
56use crate::coord::appends::BuiltinTableAppendNotify;
57use crate::coord::in_memory_oracle::InMemoryTimestampOracle;
58use crate::coord::peek::PeekResponseUnary;
59use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
60use crate::coord::{Coordinator, ExplainContext};
61use crate::error::AdapterError;
62use crate::metrics::{Metrics, SessionMetrics};
63use crate::statement_logging::PreparedStatementLoggingInfo;
64use crate::{AdapterNotice, ExecuteContext};
65use mz_catalog::durable::Snapshot;
66
67const DUMMY_CONNECTION_ID: ConnectionId = ConnectionId::Static(0);
68
69#[derive(Derivative)]
71#[derivative(Debug)]
72pub struct Session<T = mz_repr::Timestamp>
73where
74 T: Debug + Clone + Send + Sync,
75{
76 conn_id: ConnectionId,
77 uuid: Uuid,
80 prepared_statements: BTreeMap<String, PreparedStatement>,
81 portals: BTreeMap<String, Portal>,
82 transaction: TransactionStatus<T>,
83 pcx: Option<PlanContext>,
84 metrics: SessionMetrics,
85 #[derivative(Debug = "ignore")]
86 builtin_updates: Option<BuiltinTableAppendNotify>,
87
88 role_metadata: Option<RoleMetadata>,
101 client_ip: Option<IpAddr>,
102 vars: SessionVars,
103 notices_tx: mpsc::UnboundedSender<AdapterNotice>,
104 notices_rx: mpsc::UnboundedReceiver<AdapterNotice>,
105 next_transaction_id: TransactionId,
106 secret_key: u32,
107 external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
108 #[derivative(Debug = "ignore")]
120 qcell_owner: QCellOwner,
121 session_oracles: BTreeMap<Timeline, InMemoryTimestampOracle<T, NowFn<T>>>,
122 state_revision: u64,
128}
129
130impl<T> SessionMetadata for Session<T>
131where
132 T: Debug + Clone + Send + Sync,
133 T: TimestampManipulation,
134{
135 fn conn_id(&self) -> &ConnectionId {
136 &self.conn_id
137 }
138
139 fn client_ip(&self) -> Option<&IpAddr> {
140 self.client_ip.as_ref()
141 }
142
143 fn pcx(&self) -> &PlanContext {
144 &self
145 .transaction()
146 .inner()
147 .expect("no active transaction")
148 .pcx
149 }
150
151 fn role_metadata(&self) -> &RoleMetadata {
152 self.role_metadata
153 .as_ref()
154 .expect("role_metadata invariant violated")
155 }
156
157 fn vars(&self) -> &SessionVars {
158 &self.vars
159 }
160}
161
162#[derive(Debug)]
165pub struct SessionMeta {
166 conn_id: ConnectionId,
167 client_ip: Option<IpAddr>,
168 pcx: PlanContext,
169 role_metadata: RoleMetadata,
170 vars: SessionVars,
171}
172
173impl SessionMetadata for SessionMeta {
174 fn vars(&self) -> &SessionVars {
175 &self.vars
176 }
177
178 fn conn_id(&self) -> &ConnectionId {
179 &self.conn_id
180 }
181
182 fn client_ip(&self) -> Option<&IpAddr> {
183 self.client_ip.as_ref()
184 }
185
186 fn pcx(&self) -> &PlanContext {
187 &self.pcx
188 }
189
190 fn role_metadata(&self) -> &RoleMetadata {
191 &self.role_metadata
192 }
193}
194
195#[derive(Debug, Clone)]
197pub struct SessionConfig {
198 pub conn_id: ConnectionId,
202 pub uuid: Uuid,
207 pub client_ip: Option<IpAddr>,
209 pub user: String,
211 pub external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
214 pub helm_chart_version: Option<String>,
216}
217
218impl<T: TimestampManipulation> Session<T> {
219 pub(crate) fn new(
221 build_info: &'static BuildInfo,
222 config: SessionConfig,
223 metrics: SessionMetrics,
224 ) -> Session<T> {
225 assert_ne!(config.conn_id, DUMMY_CONNECTION_ID);
226 Self::new_internal(build_info, config, metrics)
227 }
228
229 pub fn meta(&self) -> SessionMeta {
232 SessionMeta {
233 conn_id: self.conn_id().clone(),
234 client_ip: self.client_ip().copied(),
235 pcx: self.pcx().clone(),
236 role_metadata: self.role_metadata().clone(),
237 vars: self.vars.clone(),
238 }
239
240 }
242
243 pub(crate) fn mint_logging<A: AstInfo>(
253 &self,
254 raw_sql: String,
255 stmt: Option<&Statement<A>>,
256 now: EpochMillis,
257 ) -> Arc<QCell<PreparedStatementLoggingInfo>> {
258 Arc::new(QCell::new(
259 &self.qcell_owner,
260 PreparedStatementLoggingInfo::still_to_log(
261 raw_sql,
262 stmt,
263 now,
264 "".to_string(),
265 self.uuid,
266 false,
267 ),
268 ))
269 }
270
271 pub(crate) fn qcell_ro<'a, T2: 'a>(&'a self, cell: &'a Arc<QCell<T2>>) -> &'a T2 {
272 self.qcell_owner.ro(&*cell)
273 }
274
275 pub(crate) fn qcell_rw<'a, T2: 'a>(&'a mut self, cell: &'a Arc<QCell<T2>>) -> &'a mut T2 {
276 self.qcell_owner.rw(&*cell)
277 }
278
279 pub fn uuid(&self) -> Uuid {
282 self.uuid
283 }
284
285 pub fn dummy() -> Session<T> {
290 let registry = MetricsRegistry::new();
291 let metrics = Metrics::register_into(®istry);
292 let metrics = metrics.session_metrics();
293 let mut dummy = Self::new_internal(
294 &DUMMY_BUILD_INFO,
295 SessionConfig {
296 conn_id: DUMMY_CONNECTION_ID,
297 uuid: Uuid::new_v4(),
298 user: SYSTEM_USER.name.clone(),
299 client_ip: None,
300 external_metadata_rx: None,
301 helm_chart_version: None,
302 },
303 metrics,
304 );
305 dummy.initialize_role_metadata(RoleId::User(0));
306 dummy
307 }
308
309 fn new_internal(
310 build_info: &'static BuildInfo,
311 SessionConfig {
312 conn_id,
313 uuid,
314 user,
315 client_ip,
316 mut external_metadata_rx,
317 helm_chart_version,
318 }: SessionConfig,
319 metrics: SessionMetrics,
320 ) -> Session<T> {
321 let (notices_tx, notices_rx) = mpsc::unbounded_channel();
322 let default_cluster = INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER.get(&user);
323 let user = User {
324 name: user,
325 internal_metadata: None,
326 external_metadata: external_metadata_rx
327 .as_mut()
328 .map(|rx| rx.borrow_and_update().clone()),
329 };
330 let mut vars = SessionVars::new_unchecked(build_info, user, helm_chart_version);
331 if let Some(default_cluster) = default_cluster {
332 vars.set_cluster(default_cluster.clone());
333 }
334 Session {
335 conn_id,
336 uuid,
337 transaction: TransactionStatus::Default,
338 pcx: None,
339 metrics,
340 builtin_updates: None,
341 prepared_statements: BTreeMap::new(),
342 portals: BTreeMap::new(),
343 role_metadata: None,
344 client_ip,
345 vars,
346 notices_tx,
347 notices_rx,
348 next_transaction_id: 0,
349 secret_key: rand::random(),
350 external_metadata_rx,
351 qcell_owner: QCellOwner::new(),
352 session_oracles: BTreeMap::new(),
353 state_revision: 0,
354 }
355 }
356
357 pub fn secret_key(&self) -> u32 {
359 self.secret_key
360 }
361
362 fn new_pcx(&self, mut wall_time: DateTime<Utc>) -> PlanContext {
363 if let Some(mock_time) = self.vars().unsafe_new_transaction_wall_time() {
364 wall_time = *mock_time;
365 }
366 PlanContext::new(wall_time)
367 }
368
369 pub fn start_transaction(
372 &mut self,
373 wall_time: DateTime<Utc>,
374 access: Option<TransactionAccessMode>,
375 isolation_level: Option<TransactionIsolationLevel>,
376 ) -> Result<(), AdapterError> {
377 if let Some(txn) = self.transaction.inner() {
379 let read_write_prohibited = match txn.ops {
383 TransactionOps::Peeks { .. } | TransactionOps::Subscribe => {
384 txn.access == Some(TransactionAccessMode::ReadOnly)
385 }
386 TransactionOps::None
387 | TransactionOps::Writes(_)
388 | TransactionOps::SingleStatement { .. }
389 | TransactionOps::DDL { .. } => false,
390 };
391
392 if read_write_prohibited && access == Some(TransactionAccessMode::ReadWrite) {
393 return Err(AdapterError::ReadWriteUnavailable);
394 }
395 }
396
397 match std::mem::take(&mut self.transaction) {
398 TransactionStatus::Default => {
399 let id = self.next_transaction_id;
400 self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
401 self.transaction = TransactionStatus::InTransaction(Transaction {
402 pcx: self.new_pcx(wall_time),
403 ops: TransactionOps::None,
404 write_lock_guards: None,
405 access,
406 id,
407 });
408 }
409 TransactionStatus::Started(mut txn)
410 | TransactionStatus::InTransactionImplicit(mut txn)
411 | TransactionStatus::InTransaction(mut txn) => {
412 if access.is_some() {
413 txn.access = access;
414 }
415 self.transaction = TransactionStatus::InTransaction(txn);
416 }
417 TransactionStatus::Failed(_) => unreachable!(),
418 };
419
420 if let Some(isolation_level) = isolation_level {
421 self.vars
422 .set_local_transaction_isolation(isolation_level.into());
423 }
424
425 Ok(())
426 }
427
428 pub fn start_transaction_implicit(&mut self, wall_time: DateTime<Utc>, stmts: usize) {
431 if let TransactionStatus::Default = self.transaction {
432 let id = self.next_transaction_id;
433 self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
434 let txn = Transaction {
435 pcx: self.new_pcx(wall_time),
436 ops: TransactionOps::None,
437 write_lock_guards: None,
438 access: None,
439 id,
440 };
441 match stmts {
442 1 => self.transaction = TransactionStatus::Started(txn),
443 n if n > 1 => self.transaction = TransactionStatus::InTransactionImplicit(txn),
444 _ => {}
445 }
446 }
447 }
448
449 pub fn start_transaction_single_stmt(&mut self, wall_time: DateTime<Utc>) {
451 self.start_transaction_implicit(wall_time, 1);
452 }
453
454 #[must_use]
464 pub fn clear_transaction(&mut self) -> TransactionStatus<T> {
465 self.portals.clear();
466 self.pcx = None;
467 self.state_revision += 1;
468 mem::take(&mut self.transaction)
469 }
470
471 pub fn fail_transaction(mut self) -> Self {
473 match self.transaction {
474 TransactionStatus::Default => unreachable!(),
475 TransactionStatus::Started(txn)
476 | TransactionStatus::InTransactionImplicit(txn)
477 | TransactionStatus::InTransaction(txn) => {
478 self.transaction = TransactionStatus::Failed(txn);
479 }
480 TransactionStatus::Failed(_) => {}
481 };
482 self
483 }
484
485 pub fn transaction(&self) -> &TransactionStatus<T> {
487 &self.transaction
488 }
489
490 pub fn transaction_mut(&mut self) -> &mut TransactionStatus<T> {
492 &mut self.transaction
493 }
494
495 pub fn transaction_code(&self) -> TransactionCode {
497 self.transaction().into()
498 }
499
500 pub fn add_transaction_ops(&mut self, add_ops: TransactionOps<T>) -> Result<(), AdapterError> {
504 self.transaction.add_ops(add_ops)
505 }
506
507 pub fn retain_notice_transmitter(&self) -> UnboundedSender<AdapterNotice> {
509 self.notices_tx.clone()
510 }
511
512 pub fn add_notice(&self, notice: AdapterNotice) {
514 self.add_notices([notice])
515 }
516
517 pub fn add_notices(&self, notices: impl IntoIterator<Item = AdapterNotice>) {
519 for notice in notices {
520 let _ = self.notices_tx.send(notice);
521 }
522 }
523
524 pub async fn recv_notice(&mut self) -> AdapterNotice {
528 loop {
530 let notice = self
531 .notices_rx
532 .recv()
533 .await
534 .expect("Session also holds a sender, so recv won't ever return None");
535 match self.notice_filter(notice) {
536 Some(notice) => return notice,
537 None => continue,
538 }
539 }
540 }
541
542 pub fn drain_notices(&mut self) -> Vec<AdapterNotice> {
544 let mut notices = Vec::new();
545 while let Ok(notice) = self.notices_rx.try_recv() {
546 if let Some(notice) = self.notice_filter(notice) {
547 notices.push(notice);
548 }
549 }
550 notices
551 }
552
553 fn notice_filter(&self, notice: AdapterNotice) -> Option<AdapterNotice> {
555 let minimum_client_severity = self.vars.client_min_messages();
557 let sev = notice.severity();
558 if !minimum_client_severity.should_output_to_client(&sev) {
559 return None;
560 }
561 if let AdapterNotice::ClusterReplicaStatusChanged { cluster, .. } = ¬ice {
563 if cluster != self.vars.cluster() {
564 return None;
565 }
566 }
567 Some(notice)
568 }
569
570 pub fn clear_transaction_ops(&mut self) {
573 if let Some(txn) = self.transaction.inner_mut() {
574 txn.ops = TransactionOps::None;
575 }
576 }
577
578 pub fn take_transaction_timestamp_context(&mut self) -> Option<TimestampContext<T>> {
583 if let Some(Transaction { ops, .. }) = self.transaction.inner_mut() {
584 if let TransactionOps::Peeks { .. } = ops {
585 let ops = std::mem::take(ops);
586 Some(
587 ops.timestamp_determination()
588 .expect("checked above")
589 .timestamp_context,
590 )
591 } else {
592 None
593 }
594 } else {
595 None
596 }
597 }
598
599 pub fn get_transaction_timestamp_determination(&self) -> Option<TimestampDetermination<T>> {
604 match self.transaction.inner() {
605 Some(Transaction {
606 pcx: _,
607 ops: TransactionOps::Peeks { determination, .. },
608 write_lock_guards: _,
609 access: _,
610 id: _,
611 }) => Some(determination.clone()),
612 _ => None,
613 }
614 }
615
616 pub fn contains_read_timestamp(&self) -> bool {
618 matches!(
619 self.transaction.inner(),
620 Some(Transaction {
621 pcx: _,
622 ops: TransactionOps::Peeks {
623 determination: TimestampDetermination {
624 timestamp_context: TimestampContext::TimelineTimestamp { .. },
625 ..
626 },
627 ..
628 },
629 write_lock_guards: _,
630 access: _,
631 id: _,
632 })
633 )
634 }
635
636 pub fn set_prepared_statement(
638 &mut self,
639 name: String,
640 stmt: Option<Statement<Raw>>,
641 raw_sql: String,
642 desc: StatementDesc,
643 state_revision: StateRevision,
644 now: EpochMillis,
645 ) {
646 let logging = PreparedStatementLoggingInfo::still_to_log(
647 raw_sql,
648 stmt.as_ref(),
649 now,
650 name.clone(),
651 self.uuid,
652 false,
653 );
654 let statement = PreparedStatement {
655 stmt,
656 desc,
657 state_revision,
658 logging: Arc::new(QCell::new(&self.qcell_owner, logging)),
659 };
660 self.prepared_statements.insert(name, statement);
661 }
662
663 pub fn remove_prepared_statement(&mut self, name: &str) -> bool {
667 self.prepared_statements.remove(name).is_some()
668 }
669
670 pub fn remove_all_prepared_statements(&mut self) {
672 self.prepared_statements.clear();
673 }
674
675 pub fn get_prepared_statement_unverified(&self, name: &str) -> Option<&PreparedStatement> {
680 self.prepared_statements.get(name)
681 }
682
683 pub fn get_prepared_statement_mut_unverified(
688 &mut self,
689 name: &str,
690 ) -> Option<&mut PreparedStatement> {
691 self.prepared_statements.get_mut(name)
692 }
693
694 pub fn prepared_statements(&self) -> &BTreeMap<String, PreparedStatement> {
696 &self.prepared_statements
697 }
698
699 pub fn portals(&self) -> &BTreeMap<String, Portal> {
701 &self.portals
702 }
703
704 pub fn set_portal(
714 &mut self,
715 portal_name: String,
716 desc: StatementDesc,
717 stmt: Option<Statement<Raw>>,
718 logging: Arc<QCell<PreparedStatementLoggingInfo>>,
719 params: Vec<(Datum, SqlScalarType)>,
720 result_formats: Vec<Format>,
721 state_revision: StateRevision,
722 ) -> Result<(), AdapterError> {
723 if !portal_name.is_empty() && self.portals.contains_key(&portal_name) {
725 return Err(AdapterError::DuplicateCursor(portal_name));
726 }
727 self.state_revision += 1;
728 let param_types = desc.param_types.clone();
729 self.portals.insert(
730 portal_name,
731 Portal {
732 stmt: stmt.map(Arc::new),
733 desc,
734 state_revision,
735 parameters: Params {
736 datums: Row::pack(params.iter().map(|(d, _t)| d)),
737 execute_types: params.into_iter().map(|(_d, t)| t).collect(),
738 expected_types: param_types,
739 },
740 result_formats,
741 state: PortalState::NotStarted,
742 logging,
743 lifecycle_timestamps: None,
744 },
745 );
746 Ok(())
747 }
748
749 pub fn remove_portal(&mut self, portal_name: &str) -> bool {
753 self.state_revision += 1;
754 self.portals.remove(portal_name).is_some()
755 }
756
757 pub fn get_portal_unverified(&self, portal_name: &str) -> Option<&Portal> {
761 self.portals.get(portal_name)
762 }
763
764 pub fn get_portal_unverified_mut(&mut self, portal_name: &str) -> Option<PortalRefMut<'_>> {
771 self.portals.get_mut(portal_name).map(|p| PortalRefMut {
772 stmt: &p.stmt,
773 desc: &p.desc,
774 state_revision: &mut p.state_revision,
775 parameters: &mut p.parameters,
776 result_formats: &mut p.result_formats,
777 logging: &mut p.logging,
778 state: &mut p.state,
779 lifecycle_timestamps: &mut p.lifecycle_timestamps,
780 })
781 }
782
783 pub fn create_new_portal(
785 &mut self,
786 stmt: Option<Statement<Raw>>,
787 logging: Arc<QCell<PreparedStatementLoggingInfo>>,
788 desc: StatementDesc,
789 parameters: Params,
790 result_formats: Vec<Format>,
791 state_revision: StateRevision,
792 ) -> Result<String, AdapterError> {
793 self.state_revision += 1;
794
795 for i in 0usize.. {
797 let name = format!("<unnamed portal {}>", i);
798 match self.portals.entry(name.clone()) {
799 Entry::Occupied(_) => continue,
800 Entry::Vacant(entry) => {
801 entry.insert(Portal {
802 stmt: stmt.map(Arc::new),
803 desc,
804 state_revision,
805 parameters,
806 result_formats,
807 state: PortalState::NotStarted,
808 logging,
809 lifecycle_timestamps: None,
810 });
811 return Ok(name);
812 }
813 }
814 }
815
816 coord_bail!("unable to create a new portal");
817 }
818
819 pub fn reset(&mut self) {
822 let _ = self.clear_transaction();
823 self.prepared_statements.clear();
824 self.vars.reset_all();
825 }
826
827 pub fn application_name(&self) -> &str {
831 self.vars.application_name()
832 }
833
834 pub fn vars(&self) -> &SessionVars {
836 &self.vars
837 }
838
839 pub fn vars_mut(&mut self) -> &mut SessionVars {
841 &mut self.vars
842 }
843
844 pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
850 self.transaction.try_grant_write_locks(guards)
851 }
852
853 pub fn apply_external_metadata_updates(&mut self) {
855 let Some(rx) = &mut self.external_metadata_rx else {
857 return;
858 };
859
860 if !rx.has_changed().unwrap_or(false) {
862 return;
863 }
864
865 let metadata = rx.borrow_and_update().clone();
868 self.vars.set_external_user_metadata(metadata);
869 }
870
871 pub fn apply_internal_user_metadata(&mut self, metadata: InternalUserMetadata) {
873 self.vars.set_internal_user_metadata(metadata);
874 }
875
876 pub fn initialize_role_metadata(&mut self, role_id: RoleId) {
878 self.role_metadata = Some(RoleMetadata::new(role_id));
879 }
880
881 pub fn ensure_timestamp_oracle(
884 &mut self,
885 timeline: Timeline,
886 ) -> &mut InMemoryTimestampOracle<T, NowFn<T>> {
887 self.session_oracles
888 .entry(timeline)
889 .or_insert_with(|| InMemoryTimestampOracle::new(T::minimum(), NowFn::from(T::minimum)))
890 }
891
892 pub fn ensure_local_timestamp_oracle(&mut self) -> &mut InMemoryTimestampOracle<T, NowFn<T>> {
895 self.ensure_timestamp_oracle(Timeline::EpochMilliseconds)
896 }
897
898 pub fn get_timestamp_oracle(
900 &self,
901 timeline: &Timeline,
902 ) -> Option<&InMemoryTimestampOracle<T, NowFn<T>>> {
903 self.session_oracles.get(timeline)
904 }
905
906 pub fn apply_write(&mut self, timestamp: T) {
909 if self.vars().transaction_isolation() == &IsolationLevel::StrongSessionSerializable {
910 self.ensure_local_timestamp_oracle().apply_write(timestamp);
911 }
912 }
913
914 pub fn metrics(&self) -> &SessionMetrics {
916 &self.metrics
917 }
918
919 pub fn set_builtin_table_updates(&mut self, fut: BuiltinTableAppendNotify) {
921 let prev = self.builtin_updates.replace(fut);
922 mz_ore::soft_assert_or_log!(prev.is_none(), "replacing old builtin table notify");
923 }
924
925 pub fn clear_builtin_table_updates(&mut self) -> Option<impl Future<Output = ()> + 'static> {
928 if let Some(fut) = self.builtin_updates.take() {
929 let histogram = self
931 .metrics()
932 .session_startup_table_writes_seconds()
933 .clone();
934 Some(async move {
935 fut.wall_time().observe(histogram).await;
936 })
937 } else {
938 None
939 }
940 }
941
942 pub fn state_revision(&self) -> u64 {
945 self.state_revision
946 }
947}
948
949#[derive(Derivative, Clone)]
951#[derivative(Debug)]
952pub struct PreparedStatement {
953 stmt: Option<Statement<Raw>>,
954 desc: StatementDesc,
955 pub state_revision: StateRevision,
957 #[derivative(Debug = "ignore")]
958 logging: Arc<QCell<PreparedStatementLoggingInfo>>,
959}
960
961impl PreparedStatement {
962 pub fn stmt(&self) -> Option<&Statement<Raw>> {
965 self.stmt.as_ref()
966 }
967
968 pub fn desc(&self) -> &StatementDesc {
970 &self.desc
971 }
972
973 pub fn logging(&self) -> &Arc<QCell<PreparedStatementLoggingInfo>> {
975 &self.logging
976 }
977}
978
979#[derive(Derivative)]
981#[derivative(Debug)]
982pub struct Portal {
983 pub stmt: Option<Arc<Statement<Raw>>>,
985 pub desc: StatementDesc,
987 pub state_revision: StateRevision,
989 pub parameters: Params,
991 pub result_formats: Vec<Format>,
993 #[derivative(Debug = "ignore")]
995 pub logging: Arc<QCell<PreparedStatementLoggingInfo>>,
996 #[derivative(Debug = "ignore")]
998 pub state: PortalState,
999 pub lifecycle_timestamps: Option<LifecycleTimestamps>,
1001}
1002
1003pub struct PortalRefMut<'a> {
1008 pub stmt: &'a Option<Arc<Statement<Raw>>>,
1010 pub desc: &'a StatementDesc,
1012 pub state_revision: &'a mut StateRevision,
1014 pub parameters: &'a mut Params,
1016 pub result_formats: &'a mut Vec<Format>,
1018 pub logging: &'a mut Arc<QCell<PreparedStatementLoggingInfo>>,
1020 pub state: &'a mut PortalState,
1022 pub lifecycle_timestamps: &'a mut Option<LifecycleTimestamps>,
1024}
1025
1026#[derive(Debug, Clone, Copy, PartialEq)]
1030pub struct StateRevision {
1031 pub catalog_revision: u64,
1033 pub session_state_revision: u64,
1035}
1036
1037pub enum PortalState {
1039 NotStarted,
1041 InProgress(Option<InProgressRows>),
1044 Completed(Option<String>),
1048}
1049
1050pub struct InProgressRows {
1052 pub current: Option<Box<dyn RowIterator + Send + Sync>>,
1054 pub remaining: RecordFirstRowStream,
1056}
1057
1058impl InProgressRows {
1059 pub fn new(remaining: RecordFirstRowStream) -> Self {
1061 Self {
1062 current: None,
1063 remaining,
1064 }
1065 }
1066
1067 pub fn no_more_rows(&self) -> bool {
1070 self.remaining.no_more_rows && self.current.is_none()
1071 }
1072}
1073
1074pub type RowBatchStream = UnboundedReceiver<PeekResponseUnary>;
1076
1077#[derive(Debug, Clone)]
1080pub struct LifecycleTimestamps {
1081 pub received: EpochMillis,
1086}
1087
1088impl LifecycleTimestamps {
1089 pub fn new(received: EpochMillis) -> Self {
1091 Self { received }
1092 }
1093}
1094
1095#[derive(Debug)]
1099pub enum TransactionStatus<T> {
1100 Default,
1102 Started(Transaction<T>),
1112 InTransaction(Transaction<T>),
1114 InTransactionImplicit(Transaction<T>),
1117 Failed(Transaction<T>),
1119}
1120
1121impl<T: TimestampManipulation> TransactionStatus<T> {
1122 pub fn into_ops_and_lock_guard(self) -> (Option<TransactionOps<T>>, Option<WriteLocks>) {
1124 match self {
1125 TransactionStatus::Default | TransactionStatus::Failed(_) => (None, None),
1126 TransactionStatus::Started(txn)
1127 | TransactionStatus::InTransaction(txn)
1128 | TransactionStatus::InTransactionImplicit(txn) => {
1129 (Some(txn.ops), txn.write_lock_guards)
1130 }
1131 }
1132 }
1133
1134 pub fn inner(&self) -> Option<&Transaction<T>> {
1136 match self {
1137 TransactionStatus::Default => None,
1138 TransactionStatus::Started(txn)
1139 | TransactionStatus::InTransaction(txn)
1140 | TransactionStatus::InTransactionImplicit(txn)
1141 | TransactionStatus::Failed(txn) => Some(txn),
1142 }
1143 }
1144
1145 pub fn inner_mut(&mut self) -> Option<&mut Transaction<T>> {
1147 match self {
1148 TransactionStatus::Default => None,
1149 TransactionStatus::Started(txn)
1150 | TransactionStatus::InTransaction(txn)
1151 | TransactionStatus::InTransactionImplicit(txn)
1152 | TransactionStatus::Failed(txn) => Some(txn),
1153 }
1154 }
1155
1156 pub fn is_ddl(&self) -> bool {
1158 match self {
1159 TransactionStatus::Default => false,
1160 TransactionStatus::Started(txn)
1161 | TransactionStatus::InTransaction(txn)
1162 | TransactionStatus::InTransactionImplicit(txn)
1163 | TransactionStatus::Failed(txn) => {
1164 matches!(txn.ops, TransactionOps::DDL { .. })
1165 }
1166 }
1167 }
1168
1169 pub fn is_implicit(&self) -> bool {
1172 match self {
1173 TransactionStatus::Started(_) | TransactionStatus::InTransactionImplicit(_) => true,
1174 TransactionStatus::Default
1175 | TransactionStatus::InTransaction(_)
1176 | TransactionStatus::Failed(_) => false,
1177 }
1178 }
1179
1180 pub fn is_in_multi_statement_transaction(&self) -> bool {
1182 match self {
1183 TransactionStatus::InTransaction(_) | TransactionStatus::InTransactionImplicit(_) => {
1184 true
1185 }
1186 TransactionStatus::Default
1187 | TransactionStatus::Started(_)
1188 | TransactionStatus::Failed(_) => false,
1189 }
1190 }
1191
1192 pub fn in_immediate_multi_stmt_txn(&self, when: &QueryWhen) -> bool {
1194 self.is_in_multi_statement_transaction() && when == &QueryWhen::Immediately
1195 }
1196
1197 pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1206 match self {
1207 TransactionStatus::Default => panic!("cannot grant write lock to txn not yet started"),
1208 TransactionStatus::Started(txn)
1209 | TransactionStatus::InTransaction(txn)
1210 | TransactionStatus::InTransactionImplicit(txn)
1211 | TransactionStatus::Failed(txn) => txn.try_grant_write_locks(guards),
1212 }
1213 }
1214
1215 pub fn write_locks(&self) -> Option<&WriteLocks> {
1217 match self {
1218 TransactionStatus::Default => None,
1219 TransactionStatus::Started(txn)
1220 | TransactionStatus::InTransaction(txn)
1221 | TransactionStatus::InTransactionImplicit(txn)
1222 | TransactionStatus::Failed(txn) => txn.write_lock_guards.as_ref(),
1223 }
1224 }
1225
1226 pub fn timeline(&self) -> Option<Timeline> {
1228 match self {
1229 TransactionStatus::Default => None,
1230 TransactionStatus::Started(txn)
1231 | TransactionStatus::InTransaction(txn)
1232 | TransactionStatus::InTransactionImplicit(txn)
1233 | TransactionStatus::Failed(txn) => txn.timeline(),
1234 }
1235 }
1236
1237 pub fn cluster(&self) -> Option<ClusterId> {
1239 match self {
1240 TransactionStatus::Default => None,
1241 TransactionStatus::Started(txn)
1242 | TransactionStatus::InTransaction(txn)
1243 | TransactionStatus::InTransactionImplicit(txn)
1244 | TransactionStatus::Failed(txn) => txn.cluster(),
1245 }
1246 }
1247
1248 pub fn catalog_state(&self) -> Option<&CatalogState> {
1250 match self.inner() {
1251 Some(Transaction {
1252 ops: TransactionOps::DDL { state, .. },
1253 ..
1254 }) => Some(state),
1255 _ => None,
1256 }
1257 }
1258
1259 pub fn contains_ops(&self) -> bool {
1261 match self.inner() {
1262 Some(txn) => txn.contains_ops(),
1263 None => false,
1264 }
1265 }
1266
1267 pub fn allows_writes(&self) -> bool {
1271 match self {
1272 TransactionStatus::Started(Transaction { ops, access, .. })
1273 | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1274 | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1275 match ops {
1276 TransactionOps::None => access != &Some(TransactionAccessMode::ReadOnly),
1277 TransactionOps::Peeks { determination, .. } => {
1278 !determination.timestamp_context.contains_timestamp()
1282 }
1283 TransactionOps::Subscribe => false,
1284 TransactionOps::Writes(_) => true,
1285 TransactionOps::SingleStatement { .. } => false,
1286 TransactionOps::DDL { .. } => false,
1287 }
1288 }
1289 TransactionStatus::Default | TransactionStatus::Failed(_) => {
1290 unreachable!()
1291 }
1292 }
1293 }
1294
1295 pub fn add_ops(&mut self, add_ops: TransactionOps<T>) -> Result<(), AdapterError> {
1308 match self {
1309 TransactionStatus::Started(Transaction { ops, access, .. })
1310 | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1311 | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1312 match ops {
1313 TransactionOps::None => {
1314 if matches!(access, Some(TransactionAccessMode::ReadOnly))
1315 && matches!(add_ops, TransactionOps::Writes(_))
1316 {
1317 return Err(AdapterError::ReadOnlyTransaction);
1318 }
1319 *ops = add_ops;
1320 }
1321 TransactionOps::Peeks {
1322 determination,
1323 cluster_id,
1324 requires_linearization,
1325 } => match add_ops {
1326 TransactionOps::Peeks {
1327 determination: add_timestamp_determination,
1328 cluster_id: add_cluster_id,
1329 requires_linearization: add_requires_linearization,
1330 } => {
1331 assert_eq!(*cluster_id, add_cluster_id);
1332 match (
1333 &determination.timestamp_context,
1334 &add_timestamp_determination.timestamp_context,
1335 ) {
1336 (
1337 TimestampContext::TimelineTimestamp {
1338 timeline: txn_timeline,
1339 chosen_ts: txn_ts,
1340 oracle_ts: _,
1341 },
1342 TimestampContext::TimelineTimestamp {
1343 timeline: add_timeline,
1344 chosen_ts: add_ts,
1345 oracle_ts: _,
1346 },
1347 ) => {
1348 assert_eq!(txn_timeline, add_timeline);
1349 assert_eq!(txn_ts, add_ts);
1350 }
1351 (TimestampContext::NoTimestamp, _) => {
1352 *determination = add_timestamp_determination
1353 }
1354 (_, TimestampContext::NoTimestamp) => {}
1355 };
1356 if matches!(requires_linearization, RequireLinearization::NotRequired)
1357 && matches!(
1358 add_requires_linearization,
1359 RequireLinearization::Required
1360 )
1361 {
1362 *requires_linearization = add_requires_linearization;
1363 }
1364 }
1365 writes @ TransactionOps::Writes(..)
1369 if !determination.timestamp_context.contains_timestamp() =>
1370 {
1371 *ops = writes;
1372 }
1373 _ => return Err(AdapterError::ReadOnlyTransaction),
1374 },
1375 TransactionOps::Subscribe => {
1376 return Err(AdapterError::SubscribeOnlyTransaction);
1377 }
1378 TransactionOps::Writes(txn_writes) => match add_ops {
1379 TransactionOps::Writes(mut add_writes) => {
1380 assert!(!matches!(access, Some(TransactionAccessMode::ReadOnly)));
1383 txn_writes.append(&mut add_writes);
1384 }
1385 TransactionOps::Peeks { determination, .. }
1388 if !determination.timestamp_context.contains_timestamp() => {}
1389 _ => {
1390 return Err(AdapterError::WriteOnlyTransaction);
1391 }
1392 },
1393 TransactionOps::SingleStatement { .. } => {
1394 return Err(AdapterError::SingleStatementTransaction);
1395 }
1396 TransactionOps::DDL {
1397 ops: og_ops,
1398 revision: og_revision,
1399 state: og_state,
1400 side_effects,
1401 snapshot: og_snapshot,
1402 } => match add_ops {
1403 TransactionOps::DDL {
1404 ops: new_ops,
1405 revision: new_revision,
1406 side_effects: mut net_new_side_effects,
1407 state: new_state,
1408 snapshot: new_snapshot,
1409 } => {
1410 if *og_revision != new_revision {
1411 return Err(AdapterError::DDLTransactionRace);
1412 }
1413 if !new_ops.is_empty() {
1415 *og_ops = new_ops;
1416 *og_state = new_state;
1417 *og_snapshot = new_snapshot;
1418 }
1419 side_effects.append(&mut net_new_side_effects);
1420 }
1421 _ => return Err(AdapterError::DDLOnlyTransaction),
1422 },
1423 }
1424 }
1425 TransactionStatus::Default | TransactionStatus::Failed(_) => {
1426 unreachable!()
1427 }
1428 }
1429 Ok(())
1430 }
1431}
1432
1433pub type TransactionId = u64;
1435
1436impl<T> Default for TransactionStatus<T> {
1437 fn default() -> Self {
1438 TransactionStatus::Default
1439 }
1440}
1441
1442#[derive(Debug)]
1444pub struct Transaction<T> {
1445 pub pcx: PlanContext,
1447 pub ops: TransactionOps<T>,
1449 pub id: TransactionId,
1454 write_lock_guards: Option<WriteLocks>,
1456 access: Option<TransactionAccessMode>,
1458}
1459
1460impl<T> Transaction<T> {
1461 fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1464 match &mut self.write_lock_guards {
1465 Some(existing) => Err(existing),
1466 locks @ None => {
1467 *locks = Some(guards);
1468 Ok(())
1469 }
1470 }
1471 }
1472
1473 fn timeline(&self) -> Option<Timeline> {
1475 match &self.ops {
1476 TransactionOps::Peeks {
1477 determination:
1478 TimestampDetermination {
1479 timestamp_context: TimestampContext::TimelineTimestamp { timeline, .. },
1480 ..
1481 },
1482 ..
1483 } => Some(timeline.clone()),
1484 TransactionOps::Peeks { .. }
1485 | TransactionOps::None
1486 | TransactionOps::Subscribe
1487 | TransactionOps::Writes(_)
1488 | TransactionOps::SingleStatement { .. }
1489 | TransactionOps::DDL { .. } => None,
1490 }
1491 }
1492
1493 pub fn cluster(&self) -> Option<ClusterId> {
1495 match &self.ops {
1496 TransactionOps::Peeks { cluster_id, .. } => Some(cluster_id.clone()),
1497 TransactionOps::None
1498 | TransactionOps::Subscribe
1499 | TransactionOps::Writes(_)
1500 | TransactionOps::SingleStatement { .. }
1501 | TransactionOps::DDL { .. } => None,
1502 }
1503 }
1504
1505 fn contains_ops(&self) -> bool {
1507 !matches!(self.ops, TransactionOps::None)
1508 }
1509}
1510
1511#[derive(Debug, Clone, Copy)]
1513pub enum TransactionCode {
1514 Idle,
1516 InTransaction,
1518 Failed,
1520}
1521
1522impl From<TransactionCode> for u8 {
1523 fn from(code: TransactionCode) -> Self {
1524 match code {
1525 TransactionCode::Idle => b'I',
1526 TransactionCode::InTransaction => b'T',
1527 TransactionCode::Failed => b'E',
1528 }
1529 }
1530}
1531
1532impl From<TransactionCode> for String {
1533 fn from(code: TransactionCode) -> Self {
1534 char::from(u8::from(code)).to_string()
1535 }
1536}
1537
1538impl<T> From<&TransactionStatus<T>> for TransactionCode {
1539 fn from(status: &TransactionStatus<T>) -> TransactionCode {
1541 match status {
1542 TransactionStatus::Default => TransactionCode::Idle,
1543 TransactionStatus::Started(_) => TransactionCode::InTransaction,
1544 TransactionStatus::InTransaction(_) => TransactionCode::InTransaction,
1545 TransactionStatus::InTransactionImplicit(_) => TransactionCode::InTransaction,
1546 TransactionStatus::Failed(_) => TransactionCode::Failed,
1547 }
1548 }
1549}
1550
1551#[derive(Derivative)]
1557#[derivative(Debug)]
1558pub enum TransactionOps<T> {
1559 None,
1562 Peeks {
1567 determination: TimestampDetermination<T>,
1569 cluster_id: ClusterId,
1571 requires_linearization: RequireLinearization,
1573 },
1574 Subscribe,
1576 Writes(Vec<WriteOp>),
1579 SingleStatement {
1581 stmt: Arc<Statement<Raw>>,
1583 params: mz_sql::plan::Params,
1585 },
1586 DDL {
1590 ops: Vec<crate::catalog::Op>,
1592 state: CatalogState,
1594 #[derivative(Debug = "ignore")]
1596 side_effects: Vec<
1597 Box<
1598 dyn for<'a> FnOnce(
1599 &'a mut Coordinator,
1600 Option<&'a mut ExecuteContext>,
1601 ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
1602 + Send
1603 + Sync,
1604 >,
1605 >,
1606 revision: u64,
1608 snapshot: Option<Snapshot>,
1613 },
1614}
1615
1616impl<T> TransactionOps<T> {
1617 fn timestamp_determination(self) -> Option<TimestampDetermination<T>> {
1618 match self {
1619 TransactionOps::Peeks { determination, .. } => Some(determination),
1620 TransactionOps::None
1621 | TransactionOps::Subscribe
1622 | TransactionOps::Writes(_)
1623 | TransactionOps::SingleStatement { .. }
1624 | TransactionOps::DDL { .. } => None,
1625 }
1626 }
1627}
1628
1629impl<T> Default for TransactionOps<T> {
1630 fn default() -> Self {
1631 Self::None
1632 }
1633}
1634
1635#[derive(Debug, Clone, PartialEq)]
1637pub struct WriteOp {
1638 pub id: CatalogItemId,
1640 pub rows: TableData,
1642}
1643
1644#[derive(Debug)]
1646pub enum RequireLinearization {
1647 Required,
1649 NotRequired,
1651}
1652
1653impl From<&ExplainContext> for RequireLinearization {
1654 fn from(ctx: &ExplainContext) -> Self {
1655 match ctx {
1656 ExplainContext::None | ExplainContext::PlanInsightsNotice(_) => {
1657 RequireLinearization::Required
1658 }
1659 _ => RequireLinearization::NotRequired,
1660 }
1661 }
1662}
1663
1664#[derive(Debug)]
1668pub struct WriteLocks {
1669 locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1670 conn_id: ConnectionId,
1672}
1673
1674impl WriteLocks {
1675 pub fn builder(sources: impl IntoIterator<Item = CatalogItemId>) -> WriteLocksBuilder {
1680 let locks = sources.into_iter().map(|gid| (gid, None)).collect();
1681 WriteLocksBuilder { locks }
1682 }
1683
1684 pub fn validate(
1687 self,
1688 collections: impl Iterator<Item = CatalogItemId>,
1689 ) -> Result<Self, BTreeSet<CatalogItemId>> {
1690 let mut missing = BTreeSet::new();
1691 for collection in collections {
1692 if !self.locks.contains_key(&collection) {
1693 missing.insert(collection);
1694 }
1695 }
1696
1697 if missing.is_empty() {
1698 Ok(self)
1699 } else {
1700 drop(self);
1702 Err(missing)
1703 }
1704 }
1705}
1706
1707impl Drop for WriteLocks {
1708 fn drop(&mut self) {
1709 if !self.locks.is_empty() {
1711 tracing::info!(
1712 conn_id = %self.conn_id,
1713 locks = ?self.locks,
1714 "dropping write locks",
1715 );
1716 }
1717 }
1718}
1719
1720#[derive(Debug)]
1724pub struct WriteLocksBuilder {
1725 locks: BTreeMap<CatalogItemId, Option<tokio::sync::OwnedMutexGuard<()>>>,
1726}
1727
1728impl WriteLocksBuilder {
1729 pub fn insert_lock(&mut self, id: CatalogItemId, lock: tokio::sync::OwnedMutexGuard<()>) {
1731 self.locks.insert(id, Some(lock));
1732 }
1733
1734 pub fn all_or_nothing(self, conn_id: &ConnectionId) -> Result<WriteLocks, CatalogItemId> {
1739 let (locks, missing): (BTreeMap<_, _>, BTreeSet<_>) =
1740 self.locks
1741 .into_iter()
1742 .partition_map(|(gid, lock)| match lock {
1743 Some(lock) => itertools::Either::Left((gid, lock)),
1744 None => itertools::Either::Right(gid),
1745 });
1746
1747 match missing.iter().next() {
1748 None => {
1749 tracing::info!(%conn_id, ?locks, "acquired write locks");
1750 Ok(WriteLocks {
1751 locks,
1752 conn_id: conn_id.clone(),
1753 })
1754 }
1755 Some(gid) => {
1756 tracing::info!(?missing, "failed to acquire write locks");
1757 drop(locks);
1759 Err(*gid)
1760 }
1761 }
1762 }
1763}
1764
1765#[derive(Debug, Default)]
1814pub(crate) struct GroupCommitWriteLocks {
1815 locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1816}
1817
1818impl GroupCommitWriteLocks {
1819 pub fn merge(&mut self, mut locks: WriteLocks) {
1821 let existing = std::mem::take(&mut locks.locks);
1825 self.locks.extend(existing);
1826 }
1827
1828 pub fn missing_locks(
1830 &self,
1831 writes: impl Iterator<Item = CatalogItemId>,
1832 ) -> BTreeSet<CatalogItemId> {
1833 let mut missing = BTreeSet::new();
1834 for write in writes {
1835 if !self.locks.contains_key(&write) {
1836 missing.insert(write);
1837 }
1838 }
1839 missing
1840 }
1841}
1842
1843impl Drop for GroupCommitWriteLocks {
1844 fn drop(&mut self) {
1845 if !self.locks.is_empty() {
1846 tracing::info!(
1847 locks = ?self.locks,
1848 "dropping group commit write locks",
1849 );
1850 }
1851 }
1852}