1#![warn(missing_docs)]
13
14use std::collections::btree_map::Entry;
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Debug;
17use std::future::Future;
18use std::mem;
19use std::net::IpAddr;
20use std::pin::Pin;
21use std::sync::Arc;
22
23use chrono::{DateTime, Utc};
24use derivative::Derivative;
25use itertools::Itertools;
26use mz_adapter_types::connection::ConnectionId;
27use mz_auth::AuthenticatorKind;
28use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
29use mz_controller_types::ClusterId;
30use mz_ore::metrics::{MetricsFutureExt, MetricsRegistry};
31use mz_ore::now::{EpochMillis, NowFn};
32use mz_pgwire_common::Format;
33use mz_repr::role_id::RoleId;
34use mz_repr::user::{ExternalUserMetadata, InternalUserMetadata};
35use mz_repr::{CatalogItemId, Datum, Row, RowIterator, SqlScalarType, TimestampManipulation};
36use mz_sql::ast::{AstInfo, Raw, Statement, TransactionAccessMode};
37use mz_sql::plan::{Params, PlanContext, QueryWhen, StatementDesc};
38use mz_sql::session::metadata::SessionMetadata;
39use mz_sql::session::user::{
40 INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER, RoleMetadata, SYSTEM_USER, User,
41};
42use mz_sql::session::vars::IsolationLevel;
43pub use mz_sql::session::vars::{
44 DEFAULT_DATABASE_NAME, EndTransactionAction, SERVER_MAJOR_VERSION, SERVER_MINOR_VERSION,
45 SERVER_PATCH_VERSION, SessionVars, Var,
46};
47use mz_sql_parser::ast::TransactionIsolationLevel;
48use mz_storage_client::client::TableData;
49use mz_storage_types::sources::Timeline;
50use qcell::{QCell, QCellOwner};
51use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
52use tokio::sync::watch;
53use uuid::Uuid;
54
55use crate::catalog::CatalogState;
56use crate::client::RecordFirstRowStream;
57use crate::coord::appends::BuiltinTableAppendNotify;
58use crate::coord::in_memory_oracle::InMemoryTimestampOracle;
59use crate::coord::peek::PeekResponseUnary;
60use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
61use crate::coord::{Coordinator, ExplainContext};
62use crate::error::AdapterError;
63use crate::metrics::{Metrics, SessionMetrics};
64use crate::statement_logging::PreparedStatementLoggingInfo;
65use crate::{AdapterNotice, ExecuteContext};
66use mz_catalog::durable::Snapshot;
67
68const DUMMY_CONNECTION_ID: ConnectionId = ConnectionId::Static(0);
69
70#[derive(Derivative)]
72#[derivative(Debug)]
73pub struct Session<T = mz_repr::Timestamp>
74where
75 T: Debug + Clone + Send + Sync,
76{
77 conn_id: ConnectionId,
78 uuid: Uuid,
81 prepared_statements: BTreeMap<String, PreparedStatement>,
82 portals: BTreeMap<String, Portal>,
83 transaction: TransactionStatus<T>,
84 pcx: Option<PlanContext>,
85 metrics: SessionMetrics,
86 #[derivative(Debug = "ignore")]
87 builtin_updates: Option<BuiltinTableAppendNotify>,
88
89 role_metadata: Option<RoleMetadata>,
102 client_ip: Option<IpAddr>,
103 vars: SessionVars,
104 notices_tx: mpsc::UnboundedSender<AdapterNotice>,
105 notices_rx: mpsc::UnboundedReceiver<AdapterNotice>,
106 next_transaction_id: TransactionId,
107 secret_key: u32,
108 external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
109 #[derivative(Debug = "ignore")]
121 qcell_owner: QCellOwner,
122 session_oracles: BTreeMap<Timeline, InMemoryTimestampOracle<T, NowFn<T>>>,
123 state_revision: u64,
129}
130
131impl<T> SessionMetadata for Session<T>
132where
133 T: Debug + Clone + Send + Sync,
134 T: TimestampManipulation,
135{
136 fn conn_id(&self) -> &ConnectionId {
137 &self.conn_id
138 }
139
140 fn client_ip(&self) -> Option<&IpAddr> {
141 self.client_ip.as_ref()
142 }
143
144 fn pcx(&self) -> &PlanContext {
145 &self
146 .transaction()
147 .inner()
148 .expect("no active transaction")
149 .pcx
150 }
151
152 fn role_metadata(&self) -> &RoleMetadata {
153 self.role_metadata
154 .as_ref()
155 .expect("role_metadata invariant violated")
156 }
157
158 fn vars(&self) -> &SessionVars {
159 &self.vars
160 }
161}
162
163#[derive(Debug)]
166pub struct SessionMeta {
167 conn_id: ConnectionId,
168 client_ip: Option<IpAddr>,
169 pcx: PlanContext,
170 role_metadata: RoleMetadata,
171 vars: SessionVars,
172}
173
174impl SessionMetadata for SessionMeta {
175 fn vars(&self) -> &SessionVars {
176 &self.vars
177 }
178
179 fn conn_id(&self) -> &ConnectionId {
180 &self.conn_id
181 }
182
183 fn client_ip(&self) -> Option<&IpAddr> {
184 self.client_ip.as_ref()
185 }
186
187 fn pcx(&self) -> &PlanContext {
188 &self.pcx
189 }
190
191 fn role_metadata(&self) -> &RoleMetadata {
192 &self.role_metadata
193 }
194}
195
196#[derive(Debug, Clone)]
198pub struct SessionConfig {
199 pub conn_id: ConnectionId,
203 pub uuid: Uuid,
208 pub client_ip: Option<IpAddr>,
210 pub user: String,
212 pub external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
215 pub helm_chart_version: Option<String>,
217 pub authenticator_kind: AuthenticatorKind,
219}
220
221impl<T: TimestampManipulation> Session<T> {
222 pub(crate) fn new(
224 build_info: &'static BuildInfo,
225 config: SessionConfig,
226 metrics: SessionMetrics,
227 ) -> Session<T> {
228 assert_ne!(config.conn_id, DUMMY_CONNECTION_ID);
229 Self::new_internal(build_info, config, metrics)
230 }
231
232 pub fn meta(&self) -> SessionMeta {
235 SessionMeta {
236 conn_id: self.conn_id().clone(),
237 client_ip: self.client_ip().copied(),
238 pcx: self.pcx().clone(),
239 role_metadata: self.role_metadata().clone(),
240 vars: self.vars.clone(),
241 }
242
243 }
245
246 pub(crate) fn mint_logging<A: AstInfo>(
256 &self,
257 raw_sql: String,
258 stmt: Option<&Statement<A>>,
259 now: EpochMillis,
260 ) -> Arc<QCell<PreparedStatementLoggingInfo>> {
261 Arc::new(QCell::new(
262 &self.qcell_owner,
263 PreparedStatementLoggingInfo::still_to_log(
264 raw_sql,
265 stmt,
266 now,
267 "".to_string(),
268 self.uuid,
269 false,
270 ),
271 ))
272 }
273
274 pub(crate) fn qcell_ro<'a, T2: 'a>(&'a self, cell: &'a Arc<QCell<T2>>) -> &'a T2 {
275 self.qcell_owner.ro(&*cell)
276 }
277
278 pub(crate) fn qcell_rw<'a, T2: 'a>(&'a mut self, cell: &'a Arc<QCell<T2>>) -> &'a mut T2 {
279 self.qcell_owner.rw(&*cell)
280 }
281
282 pub fn uuid(&self) -> Uuid {
285 self.uuid
286 }
287
288 pub fn dummy() -> Session<T> {
293 let registry = MetricsRegistry::new();
294 let metrics = Metrics::register_into(®istry);
295 let metrics = metrics.session_metrics();
296 let mut dummy = Self::new_internal(
297 &DUMMY_BUILD_INFO,
298 SessionConfig {
299 conn_id: DUMMY_CONNECTION_ID,
300 uuid: Uuid::new_v4(),
301 user: SYSTEM_USER.name.clone(),
302 client_ip: None,
303 external_metadata_rx: None,
304 helm_chart_version: None,
305 authenticator_kind: AuthenticatorKind::None,
306 },
307 metrics,
308 );
309 dummy.initialize_role_metadata(RoleId::User(0));
310 dummy
311 }
312
313 fn new_internal(
314 build_info: &'static BuildInfo,
315 SessionConfig {
316 conn_id,
317 uuid,
318 user,
319 client_ip,
320 mut external_metadata_rx,
321 helm_chart_version,
322 authenticator_kind,
323 }: SessionConfig,
324 metrics: SessionMetrics,
325 ) -> Session<T> {
326 let (notices_tx, notices_rx) = mpsc::unbounded_channel();
327 let default_cluster = INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER.get(&user);
328 let user = User {
329 name: user,
330 internal_metadata: None,
331 external_metadata: external_metadata_rx
332 .as_mut()
333 .map(|rx| rx.borrow_and_update().clone()),
334 authenticator_kind: Some(authenticator_kind),
335 };
336 let mut vars = SessionVars::new_unchecked(build_info, user, helm_chart_version);
337 if let Some(default_cluster) = default_cluster {
338 vars.set_cluster(default_cluster.clone());
339 }
340 Session {
341 conn_id,
342 uuid,
343 transaction: TransactionStatus::Default,
344 pcx: None,
345 metrics,
346 builtin_updates: None,
347 prepared_statements: BTreeMap::new(),
348 portals: BTreeMap::new(),
349 role_metadata: None,
350 client_ip,
351 vars,
352 notices_tx,
353 notices_rx,
354 next_transaction_id: 0,
355 secret_key: rand::random(),
356 external_metadata_rx,
357 qcell_owner: QCellOwner::new(),
358 session_oracles: BTreeMap::new(),
359 state_revision: 0,
360 }
361 }
362
363 pub fn secret_key(&self) -> u32 {
365 self.secret_key
366 }
367
368 fn new_pcx(&self, mut wall_time: DateTime<Utc>) -> PlanContext {
369 if let Some(mock_time) = self.vars().unsafe_new_transaction_wall_time() {
370 wall_time = *mock_time;
371 }
372 PlanContext::new(wall_time)
373 }
374
375 pub fn start_transaction(
378 &mut self,
379 wall_time: DateTime<Utc>,
380 access: Option<TransactionAccessMode>,
381 isolation_level: Option<TransactionIsolationLevel>,
382 ) -> Result<(), AdapterError> {
383 if let Some(txn) = self.transaction.inner() {
385 let read_write_prohibited = match txn.ops {
389 TransactionOps::Peeks { .. } | TransactionOps::Subscribe => {
390 txn.access == Some(TransactionAccessMode::ReadOnly)
391 }
392 TransactionOps::None
393 | TransactionOps::Writes(_)
394 | TransactionOps::SingleStatement { .. }
395 | TransactionOps::DDL { .. } => false,
396 };
397
398 if read_write_prohibited && access == Some(TransactionAccessMode::ReadWrite) {
399 return Err(AdapterError::ReadWriteUnavailable);
400 }
401 }
402
403 match std::mem::take(&mut self.transaction) {
404 TransactionStatus::Default => {
405 let id = self.next_transaction_id;
406 self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
407 self.transaction = TransactionStatus::InTransaction(Transaction {
408 pcx: self.new_pcx(wall_time),
409 ops: TransactionOps::None,
410 write_lock_guards: None,
411 access,
412 id,
413 });
414 }
415 TransactionStatus::Started(mut txn)
416 | TransactionStatus::InTransactionImplicit(mut txn)
417 | TransactionStatus::InTransaction(mut txn) => {
418 if access.is_some() {
419 txn.access = access;
420 }
421 self.transaction = TransactionStatus::InTransaction(txn);
422 }
423 TransactionStatus::Failed(_) => unreachable!(),
424 };
425
426 if let Some(isolation_level) = isolation_level {
427 self.vars
428 .set_local_transaction_isolation(isolation_level.into());
429 }
430
431 Ok(())
432 }
433
434 pub fn start_transaction_implicit(&mut self, wall_time: DateTime<Utc>, stmts: usize) {
437 if let TransactionStatus::Default = self.transaction {
438 let id = self.next_transaction_id;
439 self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
440 let txn = Transaction {
441 pcx: self.new_pcx(wall_time),
442 ops: TransactionOps::None,
443 write_lock_guards: None,
444 access: None,
445 id,
446 };
447 match stmts {
448 1 => self.transaction = TransactionStatus::Started(txn),
449 n if n > 1 => self.transaction = TransactionStatus::InTransactionImplicit(txn),
450 _ => {}
451 }
452 }
453 }
454
455 pub fn start_transaction_single_stmt(&mut self, wall_time: DateTime<Utc>) {
457 self.start_transaction_implicit(wall_time, 1);
458 }
459
460 #[must_use]
470 pub fn clear_transaction(&mut self) -> TransactionStatus<T> {
471 self.portals.clear();
472 self.pcx = None;
473 self.state_revision += 1;
474 mem::take(&mut self.transaction)
475 }
476
477 pub fn fail_transaction(mut self) -> Self {
479 match self.transaction {
480 TransactionStatus::Default => unreachable!(),
481 TransactionStatus::Started(txn)
482 | TransactionStatus::InTransactionImplicit(txn)
483 | TransactionStatus::InTransaction(txn) => {
484 self.transaction = TransactionStatus::Failed(txn);
485 }
486 TransactionStatus::Failed(_) => {}
487 };
488 self
489 }
490
491 pub fn transaction(&self) -> &TransactionStatus<T> {
493 &self.transaction
494 }
495
496 pub fn transaction_mut(&mut self) -> &mut TransactionStatus<T> {
498 &mut self.transaction
499 }
500
501 pub fn transaction_code(&self) -> TransactionCode {
503 self.transaction().into()
504 }
505
506 pub fn add_transaction_ops(&mut self, add_ops: TransactionOps<T>) -> Result<(), AdapterError> {
510 self.transaction.add_ops(add_ops)
511 }
512
513 pub fn retain_notice_transmitter(&self) -> UnboundedSender<AdapterNotice> {
515 self.notices_tx.clone()
516 }
517
518 pub fn add_notice(&self, notice: AdapterNotice) {
520 self.add_notices([notice])
521 }
522
523 pub fn add_notices(&self, notices: impl IntoIterator<Item = AdapterNotice>) {
525 for notice in notices {
526 let _ = self.notices_tx.send(notice);
527 }
528 }
529
530 pub async fn recv_notice(&mut self) -> AdapterNotice {
534 loop {
536 let notice = self
537 .notices_rx
538 .recv()
539 .await
540 .expect("Session also holds a sender, so recv won't ever return None");
541 match self.notice_filter(notice) {
542 Some(notice) => return notice,
543 None => continue,
544 }
545 }
546 }
547
548 pub fn drain_notices(&mut self) -> Vec<AdapterNotice> {
550 let mut notices = Vec::new();
551 while let Ok(notice) = self.notices_rx.try_recv() {
552 if let Some(notice) = self.notice_filter(notice) {
553 notices.push(notice);
554 }
555 }
556 notices
557 }
558
559 fn notice_filter(&self, notice: AdapterNotice) -> Option<AdapterNotice> {
561 let minimum_client_severity = self.vars.client_min_messages();
563 let sev = notice.severity();
564 if !minimum_client_severity.should_output_to_client(&sev) {
565 return None;
566 }
567 if let AdapterNotice::ClusterReplicaStatusChanged { cluster, .. } = ¬ice {
569 if cluster != self.vars.cluster() {
570 return None;
571 }
572 }
573 Some(notice)
574 }
575
576 pub fn clear_transaction_ops(&mut self) {
579 if let Some(txn) = self.transaction.inner_mut() {
580 txn.ops = TransactionOps::None;
581 }
582 }
583
584 pub fn take_transaction_timestamp_context(&mut self) -> Option<TimestampContext<T>> {
589 if let Some(Transaction { ops, .. }) = self.transaction.inner_mut() {
590 if let TransactionOps::Peeks { .. } = ops {
591 let ops = std::mem::take(ops);
592 Some(
593 ops.timestamp_determination()
594 .expect("checked above")
595 .timestamp_context,
596 )
597 } else {
598 None
599 }
600 } else {
601 None
602 }
603 }
604
605 pub fn get_transaction_timestamp_determination(&self) -> Option<TimestampDetermination<T>> {
610 match self.transaction.inner() {
611 Some(Transaction {
612 pcx: _,
613 ops: TransactionOps::Peeks { determination, .. },
614 write_lock_guards: _,
615 access: _,
616 id: _,
617 }) => Some(determination.clone()),
618 _ => None,
619 }
620 }
621
622 pub fn contains_read_timestamp(&self) -> bool {
624 matches!(
625 self.transaction.inner(),
626 Some(Transaction {
627 pcx: _,
628 ops: TransactionOps::Peeks {
629 determination: TimestampDetermination {
630 timestamp_context: TimestampContext::TimelineTimestamp { .. },
631 ..
632 },
633 ..
634 },
635 write_lock_guards: _,
636 access: _,
637 id: _,
638 })
639 )
640 }
641
642 pub fn set_prepared_statement(
644 &mut self,
645 name: String,
646 stmt: Option<Statement<Raw>>,
647 raw_sql: String,
648 desc: StatementDesc,
649 state_revision: StateRevision,
650 now: EpochMillis,
651 ) {
652 let logging = PreparedStatementLoggingInfo::still_to_log(
653 raw_sql,
654 stmt.as_ref(),
655 now,
656 name.clone(),
657 self.uuid,
658 false,
659 );
660 let statement = PreparedStatement {
661 stmt,
662 desc,
663 state_revision,
664 logging: Arc::new(QCell::new(&self.qcell_owner, logging)),
665 };
666 self.prepared_statements.insert(name, statement);
667 }
668
669 pub fn remove_prepared_statement(&mut self, name: &str) -> bool {
673 self.prepared_statements.remove(name).is_some()
674 }
675
676 pub fn remove_all_prepared_statements(&mut self) {
678 self.prepared_statements.clear();
679 }
680
681 pub fn get_prepared_statement_unverified(&self, name: &str) -> Option<&PreparedStatement> {
686 self.prepared_statements.get(name)
687 }
688
689 pub fn get_prepared_statement_mut_unverified(
694 &mut self,
695 name: &str,
696 ) -> Option<&mut PreparedStatement> {
697 self.prepared_statements.get_mut(name)
698 }
699
700 pub fn prepared_statements(&self) -> &BTreeMap<String, PreparedStatement> {
702 &self.prepared_statements
703 }
704
705 pub fn portals(&self) -> &BTreeMap<String, Portal> {
707 &self.portals
708 }
709
710 pub fn set_portal(
720 &mut self,
721 portal_name: String,
722 desc: StatementDesc,
723 stmt: Option<Statement<Raw>>,
724 logging: Arc<QCell<PreparedStatementLoggingInfo>>,
725 params: Vec<(Datum, SqlScalarType)>,
726 result_formats: Vec<Format>,
727 state_revision: StateRevision,
728 ) -> Result<(), AdapterError> {
729 if !portal_name.is_empty() && self.portals.contains_key(&portal_name) {
731 return Err(AdapterError::DuplicateCursor(portal_name));
732 }
733 self.state_revision += 1;
734 let param_types = desc.param_types.clone();
735 self.portals.insert(
736 portal_name,
737 Portal {
738 stmt: stmt.map(Arc::new),
739 desc,
740 state_revision,
741 parameters: Params {
742 datums: Row::pack(params.iter().map(|(d, _t)| d)),
743 execute_types: params.into_iter().map(|(_d, t)| t).collect(),
744 expected_types: param_types,
745 },
746 result_formats,
747 state: PortalState::NotStarted,
748 logging,
749 lifecycle_timestamps: None,
750 },
751 );
752 Ok(())
753 }
754
755 pub fn remove_portal(&mut self, portal_name: &str) -> bool {
759 self.state_revision += 1;
760 self.portals.remove(portal_name).is_some()
761 }
762
763 pub fn get_portal_unverified(&self, portal_name: &str) -> Option<&Portal> {
767 self.portals.get(portal_name)
768 }
769
770 pub fn get_portal_unverified_mut(&mut self, portal_name: &str) -> Option<PortalRefMut<'_>> {
777 self.portals.get_mut(portal_name).map(|p| PortalRefMut {
778 stmt: &p.stmt,
779 desc: &p.desc,
780 state_revision: &mut p.state_revision,
781 parameters: &mut p.parameters,
782 result_formats: &mut p.result_formats,
783 logging: &mut p.logging,
784 state: &mut p.state,
785 lifecycle_timestamps: &mut p.lifecycle_timestamps,
786 })
787 }
788
789 pub fn create_new_portal(
791 &mut self,
792 stmt: Option<Statement<Raw>>,
793 logging: Arc<QCell<PreparedStatementLoggingInfo>>,
794 desc: StatementDesc,
795 parameters: Params,
796 result_formats: Vec<Format>,
797 state_revision: StateRevision,
798 ) -> Result<String, AdapterError> {
799 self.state_revision += 1;
800
801 for i in 0usize.. {
803 let name = format!("<unnamed portal {}>", i);
804 match self.portals.entry(name.clone()) {
805 Entry::Occupied(_) => continue,
806 Entry::Vacant(entry) => {
807 entry.insert(Portal {
808 stmt: stmt.map(Arc::new),
809 desc,
810 state_revision,
811 parameters,
812 result_formats,
813 state: PortalState::NotStarted,
814 logging,
815 lifecycle_timestamps: None,
816 });
817 return Ok(name);
818 }
819 }
820 }
821
822 coord_bail!("unable to create a new portal");
823 }
824
825 pub fn reset(&mut self) {
828 let _ = self.clear_transaction();
829 self.prepared_statements.clear();
830 self.vars.reset_all();
831 }
832
833 pub fn application_name(&self) -> &str {
837 self.vars.application_name()
838 }
839
840 pub fn vars(&self) -> &SessionVars {
842 &self.vars
843 }
844
845 pub fn vars_mut(&mut self) -> &mut SessionVars {
847 &mut self.vars
848 }
849
850 pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
856 self.transaction.try_grant_write_locks(guards)
857 }
858
859 pub fn apply_external_metadata_updates(&mut self) {
861 let Some(rx) = &mut self.external_metadata_rx else {
863 return;
864 };
865
866 if !rx.has_changed().unwrap_or(false) {
868 return;
869 }
870
871 let metadata = rx.borrow_and_update().clone();
874 self.vars.set_external_user_metadata(metadata);
875 }
876
877 pub fn apply_internal_user_metadata(&mut self, metadata: InternalUserMetadata) {
879 self.vars.set_internal_user_metadata(metadata);
880 }
881
882 pub fn initialize_role_metadata(&mut self, role_id: RoleId) {
884 self.role_metadata = Some(RoleMetadata::new(role_id));
885 }
886
887 pub fn ensure_timestamp_oracle(
890 &mut self,
891 timeline: Timeline,
892 ) -> &mut InMemoryTimestampOracle<T, NowFn<T>> {
893 self.session_oracles
894 .entry(timeline)
895 .or_insert_with(|| InMemoryTimestampOracle::new(T::minimum(), NowFn::from(T::minimum)))
896 }
897
898 pub fn ensure_local_timestamp_oracle(&mut self) -> &mut InMemoryTimestampOracle<T, NowFn<T>> {
901 self.ensure_timestamp_oracle(Timeline::EpochMilliseconds)
902 }
903
904 pub fn get_timestamp_oracle(
906 &self,
907 timeline: &Timeline,
908 ) -> Option<&InMemoryTimestampOracle<T, NowFn<T>>> {
909 self.session_oracles.get(timeline)
910 }
911
912 pub fn apply_write(&mut self, timestamp: T) {
915 if self.vars().transaction_isolation() == &IsolationLevel::StrongSessionSerializable {
916 self.ensure_local_timestamp_oracle().apply_write(timestamp);
917 }
918 }
919
920 pub fn metrics(&self) -> &SessionMetrics {
922 &self.metrics
923 }
924
925 pub fn set_builtin_table_updates(&mut self, fut: BuiltinTableAppendNotify) {
927 let prev = self.builtin_updates.replace(fut);
928 mz_ore::soft_assert_or_log!(prev.is_none(), "replacing old builtin table notify");
929 }
930
931 pub fn clear_builtin_table_updates(&mut self) -> Option<impl Future<Output = ()> + 'static> {
934 if let Some(fut) = self.builtin_updates.take() {
935 let histogram = self
937 .metrics()
938 .session_startup_table_writes_seconds()
939 .clone();
940 Some(async move {
941 fut.wall_time().observe(histogram).await;
942 })
943 } else {
944 None
945 }
946 }
947
948 pub fn state_revision(&self) -> u64 {
951 self.state_revision
952 }
953}
954
955#[derive(Derivative, Clone)]
957#[derivative(Debug)]
958pub struct PreparedStatement {
959 stmt: Option<Statement<Raw>>,
960 desc: StatementDesc,
961 pub state_revision: StateRevision,
963 #[derivative(Debug = "ignore")]
964 logging: Arc<QCell<PreparedStatementLoggingInfo>>,
965}
966
967impl PreparedStatement {
968 pub fn stmt(&self) -> Option<&Statement<Raw>> {
971 self.stmt.as_ref()
972 }
973
974 pub fn desc(&self) -> &StatementDesc {
976 &self.desc
977 }
978
979 pub fn logging(&self) -> &Arc<QCell<PreparedStatementLoggingInfo>> {
981 &self.logging
982 }
983}
984
985#[derive(Derivative)]
987#[derivative(Debug)]
988pub struct Portal {
989 pub stmt: Option<Arc<Statement<Raw>>>,
991 pub desc: StatementDesc,
993 pub state_revision: StateRevision,
995 pub parameters: Params,
997 pub result_formats: Vec<Format>,
999 #[derivative(Debug = "ignore")]
1001 pub logging: Arc<QCell<PreparedStatementLoggingInfo>>,
1002 #[derivative(Debug = "ignore")]
1004 pub state: PortalState,
1005 pub lifecycle_timestamps: Option<LifecycleTimestamps>,
1007}
1008
1009pub struct PortalRefMut<'a> {
1014 pub stmt: &'a Option<Arc<Statement<Raw>>>,
1016 pub desc: &'a StatementDesc,
1018 pub state_revision: &'a mut StateRevision,
1020 pub parameters: &'a mut Params,
1022 pub result_formats: &'a mut Vec<Format>,
1024 pub logging: &'a mut Arc<QCell<PreparedStatementLoggingInfo>>,
1026 pub state: &'a mut PortalState,
1028 pub lifecycle_timestamps: &'a mut Option<LifecycleTimestamps>,
1030}
1031
1032#[derive(Debug, Clone, Copy, PartialEq)]
1036pub struct StateRevision {
1037 pub catalog_revision: u64,
1039 pub session_state_revision: u64,
1041}
1042
1043pub enum PortalState {
1045 NotStarted,
1047 InProgress(Option<InProgressRows>),
1050 Completed(Option<String>),
1054}
1055
1056pub struct InProgressRows {
1058 pub current: Option<Box<dyn RowIterator + Send + Sync>>,
1060 pub remaining: RecordFirstRowStream,
1062}
1063
1064impl InProgressRows {
1065 pub fn new(remaining: RecordFirstRowStream) -> Self {
1067 Self {
1068 current: None,
1069 remaining,
1070 }
1071 }
1072
1073 pub fn no_more_rows(&self) -> bool {
1076 self.remaining.no_more_rows && self.current.is_none()
1077 }
1078}
1079
1080pub type RowBatchStream = UnboundedReceiver<PeekResponseUnary>;
1082
1083#[derive(Debug, Clone)]
1086pub struct LifecycleTimestamps {
1087 pub received: EpochMillis,
1092}
1093
1094impl LifecycleTimestamps {
1095 pub fn new(received: EpochMillis) -> Self {
1097 Self { received }
1098 }
1099}
1100
1101#[derive(Debug)]
1105pub enum TransactionStatus<T> {
1106 Default,
1108 Started(Transaction<T>),
1118 InTransaction(Transaction<T>),
1120 InTransactionImplicit(Transaction<T>),
1123 Failed(Transaction<T>),
1125}
1126
1127impl<T: TimestampManipulation> TransactionStatus<T> {
1128 pub fn into_ops_and_lock_guard(self) -> (Option<TransactionOps<T>>, Option<WriteLocks>) {
1130 match self {
1131 TransactionStatus::Default | TransactionStatus::Failed(_) => (None, None),
1132 TransactionStatus::Started(txn)
1133 | TransactionStatus::InTransaction(txn)
1134 | TransactionStatus::InTransactionImplicit(txn) => {
1135 (Some(txn.ops), txn.write_lock_guards)
1136 }
1137 }
1138 }
1139
1140 pub fn inner(&self) -> Option<&Transaction<T>> {
1142 match self {
1143 TransactionStatus::Default => None,
1144 TransactionStatus::Started(txn)
1145 | TransactionStatus::InTransaction(txn)
1146 | TransactionStatus::InTransactionImplicit(txn)
1147 | TransactionStatus::Failed(txn) => Some(txn),
1148 }
1149 }
1150
1151 pub fn inner_mut(&mut self) -> Option<&mut Transaction<T>> {
1153 match self {
1154 TransactionStatus::Default => None,
1155 TransactionStatus::Started(txn)
1156 | TransactionStatus::InTransaction(txn)
1157 | TransactionStatus::InTransactionImplicit(txn)
1158 | TransactionStatus::Failed(txn) => Some(txn),
1159 }
1160 }
1161
1162 pub fn is_ddl(&self) -> bool {
1164 match self {
1165 TransactionStatus::Default => false,
1166 TransactionStatus::Started(txn)
1167 | TransactionStatus::InTransaction(txn)
1168 | TransactionStatus::InTransactionImplicit(txn)
1169 | TransactionStatus::Failed(txn) => {
1170 matches!(txn.ops, TransactionOps::DDL { .. })
1171 }
1172 }
1173 }
1174
1175 pub fn is_implicit(&self) -> bool {
1178 match self {
1179 TransactionStatus::Started(_) | TransactionStatus::InTransactionImplicit(_) => true,
1180 TransactionStatus::Default
1181 | TransactionStatus::InTransaction(_)
1182 | TransactionStatus::Failed(_) => false,
1183 }
1184 }
1185
1186 pub fn is_in_multi_statement_transaction(&self) -> bool {
1188 match self {
1189 TransactionStatus::InTransaction(_) | TransactionStatus::InTransactionImplicit(_) => {
1190 true
1191 }
1192 TransactionStatus::Default
1193 | TransactionStatus::Started(_)
1194 | TransactionStatus::Failed(_) => false,
1195 }
1196 }
1197
1198 pub fn in_immediate_multi_stmt_txn(&self, when: &QueryWhen) -> bool {
1200 self.is_in_multi_statement_transaction() && when == &QueryWhen::Immediately
1201 }
1202
1203 pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1212 match self {
1213 TransactionStatus::Default => panic!("cannot grant write lock to txn not yet started"),
1214 TransactionStatus::Started(txn)
1215 | TransactionStatus::InTransaction(txn)
1216 | TransactionStatus::InTransactionImplicit(txn)
1217 | TransactionStatus::Failed(txn) => txn.try_grant_write_locks(guards),
1218 }
1219 }
1220
1221 pub fn write_locks(&self) -> Option<&WriteLocks> {
1223 match self {
1224 TransactionStatus::Default => None,
1225 TransactionStatus::Started(txn)
1226 | TransactionStatus::InTransaction(txn)
1227 | TransactionStatus::InTransactionImplicit(txn)
1228 | TransactionStatus::Failed(txn) => txn.write_lock_guards.as_ref(),
1229 }
1230 }
1231
1232 pub fn timeline(&self) -> Option<Timeline> {
1234 match self {
1235 TransactionStatus::Default => None,
1236 TransactionStatus::Started(txn)
1237 | TransactionStatus::InTransaction(txn)
1238 | TransactionStatus::InTransactionImplicit(txn)
1239 | TransactionStatus::Failed(txn) => txn.timeline(),
1240 }
1241 }
1242
1243 pub fn cluster(&self) -> Option<ClusterId> {
1245 match self {
1246 TransactionStatus::Default => None,
1247 TransactionStatus::Started(txn)
1248 | TransactionStatus::InTransaction(txn)
1249 | TransactionStatus::InTransactionImplicit(txn)
1250 | TransactionStatus::Failed(txn) => txn.cluster(),
1251 }
1252 }
1253
1254 pub fn catalog_state(&self) -> Option<&CatalogState> {
1256 match self.inner() {
1257 Some(Transaction {
1258 ops: TransactionOps::DDL { state, .. },
1259 ..
1260 }) => Some(state),
1261 _ => None,
1262 }
1263 }
1264
1265 pub fn contains_ops(&self) -> bool {
1267 match self.inner() {
1268 Some(txn) => txn.contains_ops(),
1269 None => false,
1270 }
1271 }
1272
1273 pub fn allows_writes(&self) -> bool {
1277 match self {
1278 TransactionStatus::Started(Transaction { ops, access, .. })
1279 | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1280 | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1281 match ops {
1282 TransactionOps::None => access != &Some(TransactionAccessMode::ReadOnly),
1283 TransactionOps::Peeks { determination, .. } => {
1284 !determination.timestamp_context.contains_timestamp()
1288 }
1289 TransactionOps::Subscribe => false,
1290 TransactionOps::Writes(_) => true,
1291 TransactionOps::SingleStatement { .. } => false,
1292 TransactionOps::DDL { .. } => false,
1293 }
1294 }
1295 TransactionStatus::Default | TransactionStatus::Failed(_) => {
1296 unreachable!()
1297 }
1298 }
1299 }
1300
1301 pub fn add_ops(&mut self, add_ops: TransactionOps<T>) -> Result<(), AdapterError> {
1314 match self {
1315 TransactionStatus::Started(Transaction { ops, access, .. })
1316 | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1317 | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1318 match ops {
1319 TransactionOps::None => {
1320 if matches!(access, Some(TransactionAccessMode::ReadOnly))
1321 && matches!(add_ops, TransactionOps::Writes(_))
1322 {
1323 return Err(AdapterError::ReadOnlyTransaction);
1324 }
1325 *ops = add_ops;
1326 }
1327 TransactionOps::Peeks {
1328 determination,
1329 cluster_id,
1330 requires_linearization,
1331 } => match add_ops {
1332 TransactionOps::Peeks {
1333 determination: add_timestamp_determination,
1334 cluster_id: add_cluster_id,
1335 requires_linearization: add_requires_linearization,
1336 } => {
1337 assert_eq!(*cluster_id, add_cluster_id);
1338 match (
1339 &determination.timestamp_context,
1340 &add_timestamp_determination.timestamp_context,
1341 ) {
1342 (
1343 TimestampContext::TimelineTimestamp {
1344 timeline: txn_timeline,
1345 chosen_ts: txn_ts,
1346 oracle_ts: _,
1347 },
1348 TimestampContext::TimelineTimestamp {
1349 timeline: add_timeline,
1350 chosen_ts: add_ts,
1351 oracle_ts: _,
1352 },
1353 ) => {
1354 assert_eq!(txn_timeline, add_timeline);
1355 assert_eq!(txn_ts, add_ts);
1356 }
1357 (TimestampContext::NoTimestamp, _) => {
1358 *determination = add_timestamp_determination
1359 }
1360 (_, TimestampContext::NoTimestamp) => {}
1361 };
1362 if matches!(requires_linearization, RequireLinearization::NotRequired)
1363 && matches!(
1364 add_requires_linearization,
1365 RequireLinearization::Required
1366 )
1367 {
1368 *requires_linearization = add_requires_linearization;
1369 }
1370 }
1371 writes @ TransactionOps::Writes(..)
1375 if !determination.timestamp_context.contains_timestamp() =>
1376 {
1377 *ops = writes;
1378 }
1379 _ => return Err(AdapterError::ReadOnlyTransaction),
1380 },
1381 TransactionOps::Subscribe => {
1382 return Err(AdapterError::SubscribeOnlyTransaction);
1383 }
1384 TransactionOps::Writes(txn_writes) => match add_ops {
1385 TransactionOps::Writes(mut add_writes) => {
1386 assert!(!matches!(access, Some(TransactionAccessMode::ReadOnly)));
1389 txn_writes.append(&mut add_writes);
1390 }
1391 TransactionOps::Peeks { determination, .. }
1394 if !determination.timestamp_context.contains_timestamp() => {}
1395 _ => {
1396 return Err(AdapterError::WriteOnlyTransaction);
1397 }
1398 },
1399 TransactionOps::SingleStatement { .. } => {
1400 return Err(AdapterError::SingleStatementTransaction);
1401 }
1402 TransactionOps::DDL {
1403 ops: og_ops,
1404 revision: og_revision,
1405 state: og_state,
1406 side_effects,
1407 snapshot: og_snapshot,
1408 } => match add_ops {
1409 TransactionOps::DDL {
1410 ops: new_ops,
1411 revision: new_revision,
1412 side_effects: mut net_new_side_effects,
1413 state: new_state,
1414 snapshot: new_snapshot,
1415 } => {
1416 if *og_revision != new_revision {
1417 return Err(AdapterError::DDLTransactionRace);
1418 }
1419 if !new_ops.is_empty() {
1421 *og_ops = new_ops;
1422 *og_state = new_state;
1423 *og_snapshot = new_snapshot;
1424 }
1425 side_effects.append(&mut net_new_side_effects);
1426 }
1427 _ => return Err(AdapterError::DDLOnlyTransaction),
1428 },
1429 }
1430 }
1431 TransactionStatus::Default | TransactionStatus::Failed(_) => {
1432 unreachable!()
1433 }
1434 }
1435 Ok(())
1436 }
1437}
1438
1439pub type TransactionId = u64;
1441
1442impl<T> Default for TransactionStatus<T> {
1443 fn default() -> Self {
1444 TransactionStatus::Default
1445 }
1446}
1447
1448#[derive(Debug)]
1450pub struct Transaction<T> {
1451 pub pcx: PlanContext,
1453 pub ops: TransactionOps<T>,
1455 pub id: TransactionId,
1460 write_lock_guards: Option<WriteLocks>,
1462 access: Option<TransactionAccessMode>,
1464}
1465
1466impl<T> Transaction<T> {
1467 fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1470 match &mut self.write_lock_guards {
1471 Some(existing) => Err(existing),
1472 locks @ None => {
1473 *locks = Some(guards);
1474 Ok(())
1475 }
1476 }
1477 }
1478
1479 fn timeline(&self) -> Option<Timeline> {
1481 match &self.ops {
1482 TransactionOps::Peeks {
1483 determination:
1484 TimestampDetermination {
1485 timestamp_context: TimestampContext::TimelineTimestamp { timeline, .. },
1486 ..
1487 },
1488 ..
1489 } => Some(timeline.clone()),
1490 TransactionOps::Peeks { .. }
1491 | TransactionOps::None
1492 | TransactionOps::Subscribe
1493 | TransactionOps::Writes(_)
1494 | TransactionOps::SingleStatement { .. }
1495 | TransactionOps::DDL { .. } => None,
1496 }
1497 }
1498
1499 pub fn cluster(&self) -> Option<ClusterId> {
1501 match &self.ops {
1502 TransactionOps::Peeks { cluster_id, .. } => Some(cluster_id.clone()),
1503 TransactionOps::None
1504 | TransactionOps::Subscribe
1505 | TransactionOps::Writes(_)
1506 | TransactionOps::SingleStatement { .. }
1507 | TransactionOps::DDL { .. } => None,
1508 }
1509 }
1510
1511 fn contains_ops(&self) -> bool {
1513 !matches!(self.ops, TransactionOps::None)
1514 }
1515}
1516
1517#[derive(Debug, Clone, Copy)]
1519pub enum TransactionCode {
1520 Idle,
1522 InTransaction,
1524 Failed,
1526}
1527
1528impl From<TransactionCode> for u8 {
1529 fn from(code: TransactionCode) -> Self {
1530 match code {
1531 TransactionCode::Idle => b'I',
1532 TransactionCode::InTransaction => b'T',
1533 TransactionCode::Failed => b'E',
1534 }
1535 }
1536}
1537
1538impl From<TransactionCode> for String {
1539 fn from(code: TransactionCode) -> Self {
1540 char::from(u8::from(code)).to_string()
1541 }
1542}
1543
1544impl<T> From<&TransactionStatus<T>> for TransactionCode {
1545 fn from(status: &TransactionStatus<T>) -> TransactionCode {
1547 match status {
1548 TransactionStatus::Default => TransactionCode::Idle,
1549 TransactionStatus::Started(_) => TransactionCode::InTransaction,
1550 TransactionStatus::InTransaction(_) => TransactionCode::InTransaction,
1551 TransactionStatus::InTransactionImplicit(_) => TransactionCode::InTransaction,
1552 TransactionStatus::Failed(_) => TransactionCode::Failed,
1553 }
1554 }
1555}
1556
1557#[derive(Derivative)]
1563#[derivative(Debug)]
1564pub enum TransactionOps<T> {
1565 None,
1568 Peeks {
1573 determination: TimestampDetermination<T>,
1575 cluster_id: ClusterId,
1577 requires_linearization: RequireLinearization,
1579 },
1580 Subscribe,
1582 Writes(Vec<WriteOp>),
1585 SingleStatement {
1587 stmt: Arc<Statement<Raw>>,
1589 params: mz_sql::plan::Params,
1591 },
1592 DDL {
1596 ops: Vec<crate::catalog::Op>,
1598 state: CatalogState,
1600 #[derivative(Debug = "ignore")]
1602 side_effects: Vec<
1603 Box<
1604 dyn for<'a> FnOnce(
1605 &'a mut Coordinator,
1606 Option<&'a mut ExecuteContext>,
1607 ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
1608 + Send
1609 + Sync,
1610 >,
1611 >,
1612 revision: u64,
1614 snapshot: Option<Snapshot>,
1619 },
1620}
1621
1622impl<T> TransactionOps<T> {
1623 fn timestamp_determination(self) -> Option<TimestampDetermination<T>> {
1624 match self {
1625 TransactionOps::Peeks { determination, .. } => Some(determination),
1626 TransactionOps::None
1627 | TransactionOps::Subscribe
1628 | TransactionOps::Writes(_)
1629 | TransactionOps::SingleStatement { .. }
1630 | TransactionOps::DDL { .. } => None,
1631 }
1632 }
1633}
1634
1635impl<T> Default for TransactionOps<T> {
1636 fn default() -> Self {
1637 Self::None
1638 }
1639}
1640
1641#[derive(Debug, Clone, PartialEq)]
1643pub struct WriteOp {
1644 pub id: CatalogItemId,
1646 pub rows: TableData,
1648}
1649
1650#[derive(Debug)]
1652pub enum RequireLinearization {
1653 Required,
1655 NotRequired,
1657}
1658
1659impl From<&ExplainContext> for RequireLinearization {
1660 fn from(ctx: &ExplainContext) -> Self {
1661 match ctx {
1662 ExplainContext::None | ExplainContext::PlanInsightsNotice(_) => {
1663 RequireLinearization::Required
1664 }
1665 _ => RequireLinearization::NotRequired,
1666 }
1667 }
1668}
1669
1670#[derive(Debug)]
1674pub struct WriteLocks {
1675 locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1676 conn_id: ConnectionId,
1678}
1679
1680impl WriteLocks {
1681 pub fn builder(sources: impl IntoIterator<Item = CatalogItemId>) -> WriteLocksBuilder {
1686 let locks = sources.into_iter().map(|gid| (gid, None)).collect();
1687 WriteLocksBuilder { locks }
1688 }
1689
1690 pub fn validate(
1693 self,
1694 collections: impl Iterator<Item = CatalogItemId>,
1695 ) -> Result<Self, BTreeSet<CatalogItemId>> {
1696 let mut missing = BTreeSet::new();
1697 for collection in collections {
1698 if !self.locks.contains_key(&collection) {
1699 missing.insert(collection);
1700 }
1701 }
1702
1703 if missing.is_empty() {
1704 Ok(self)
1705 } else {
1706 drop(self);
1708 Err(missing)
1709 }
1710 }
1711}
1712
1713impl Drop for WriteLocks {
1714 fn drop(&mut self) {
1715 if !self.locks.is_empty() {
1717 tracing::info!(
1718 conn_id = %self.conn_id,
1719 locks = ?self.locks,
1720 "dropping write locks",
1721 );
1722 }
1723 }
1724}
1725
1726#[derive(Debug)]
1730pub struct WriteLocksBuilder {
1731 locks: BTreeMap<CatalogItemId, Option<tokio::sync::OwnedMutexGuard<()>>>,
1732}
1733
1734impl WriteLocksBuilder {
1735 pub fn insert_lock(&mut self, id: CatalogItemId, lock: tokio::sync::OwnedMutexGuard<()>) {
1737 self.locks.insert(id, Some(lock));
1738 }
1739
1740 pub fn all_or_nothing(self, conn_id: &ConnectionId) -> Result<WriteLocks, CatalogItemId> {
1745 let (locks, missing): (BTreeMap<_, _>, BTreeSet<_>) =
1746 self.locks
1747 .into_iter()
1748 .partition_map(|(gid, lock)| match lock {
1749 Some(lock) => itertools::Either::Left((gid, lock)),
1750 None => itertools::Either::Right(gid),
1751 });
1752
1753 match missing.iter().next() {
1754 None => {
1755 tracing::info!(%conn_id, ?locks, "acquired write locks");
1756 Ok(WriteLocks {
1757 locks,
1758 conn_id: conn_id.clone(),
1759 })
1760 }
1761 Some(gid) => {
1762 tracing::info!(?missing, "failed to acquire write locks");
1763 drop(locks);
1765 Err(*gid)
1766 }
1767 }
1768 }
1769}
1770
1771#[derive(Debug, Default)]
1820pub(crate) struct GroupCommitWriteLocks {
1821 locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1822}
1823
1824impl GroupCommitWriteLocks {
1825 pub fn merge(&mut self, mut locks: WriteLocks) {
1827 let existing = std::mem::take(&mut locks.locks);
1831 self.locks.extend(existing);
1832 }
1833
1834 pub fn missing_locks(
1836 &self,
1837 writes: impl Iterator<Item = CatalogItemId>,
1838 ) -> BTreeSet<CatalogItemId> {
1839 let mut missing = BTreeSet::new();
1840 for write in writes {
1841 if !self.locks.contains_key(&write) {
1842 missing.insert(write);
1843 }
1844 }
1845 missing
1846 }
1847}
1848
1849impl Drop for GroupCommitWriteLocks {
1850 fn drop(&mut self) {
1851 if !self.locks.is_empty() {
1852 tracing::info!(
1853 locks = ?self.locks,
1854 "dropping group commit write locks",
1855 );
1856 }
1857 }
1858}