1use std::collections::BTreeSet;
11use std::sync::atomic::Ordering;
12use std::sync::{Arc, Mutex};
13
14use bytes::BytesMut;
15use mz_catalog::memory::objects::CatalogItem;
16use mz_controller_types::ClusterId;
17use mz_ore::cast::{CastFrom, CastInto};
18use mz_ore::now::{EpochMillis, NowFn, epoch_to_uuid_v7, to_datetime};
19use mz_ore::soft_panic_or_log;
20use mz_repr::adt::array::ArrayDimension;
21use mz_repr::adt::timestamp::TimestampLike;
22use mz_repr::{Datum, GlobalId, Row, RowIterator, RowPacker, Timestamp};
23use mz_sql::ast::display::AstDisplay;
24use mz_sql::ast::{AstInfo, Statement};
25use mz_sql::plan::Params;
26use mz_sql::session::metadata::SessionMetadata;
27use mz_sql::session::vars::SystemVars;
28use mz_sql_parser::ast::{StatementKind, statement_kind_label_value};
29use qcell::QCell;
30use rand::distr::{Bernoulli, Distribution};
31use sha2::{Digest, Sha256};
32use uuid::Uuid;
33
34use crate::catalog::CatalogState;
35use crate::session::{LifecycleTimestamps, Session, TransactionId};
36use crate::{AdapterError, CollectionIdBundle, ExecuteResponse};
37
38#[derive(Clone, Debug)]
39pub enum StatementLifecycleEvent {
40 ExecutionBegan,
41 OptimizationFinished,
42 StorageDependenciesFinished,
43 ComputeDependenciesFinished,
44 ExecutionFinished,
45}
46
47impl StatementLifecycleEvent {
48 pub fn as_str(&self) -> &str {
49 match self {
50 Self::ExecutionBegan => "execution-began",
51 Self::OptimizationFinished => "optimization-finished",
52 Self::StorageDependenciesFinished => "storage-dependencies-finished",
53 Self::ComputeDependenciesFinished => "compute-dependencies-finished",
54 Self::ExecutionFinished => "execution-finished",
55 }
56 }
57}
58
59#[derive(Clone, Debug)]
63pub struct StatementBeganExecutionRecord {
64 pub id: Uuid,
65 pub prepared_statement_id: Uuid,
66 pub sample_rate: f64,
67 pub params: Vec<Option<String>>,
68 pub began_at: EpochMillis,
69 pub cluster_id: Option<ClusterId>,
70 pub cluster_name: Option<String>,
71 pub database_name: String,
72 pub search_path: Vec<String>,
73 pub application_name: String,
74 pub transaction_isolation: String,
75 pub execution_timestamp: Option<EpochMillis>,
76 pub transaction_id: TransactionId,
77 pub transient_index_id: Option<GlobalId>,
78 pub mz_version: String,
79}
80
81#[derive(Clone, Copy, Debug)]
82pub enum StatementExecutionStrategy {
83 Standard,
85 FastPath,
88 PersistFastPath,
91 Constant,
94}
95
96impl StatementExecutionStrategy {
97 pub fn name(&self) -> &'static str {
98 match self {
99 Self::Standard => "standard",
100 Self::FastPath => "fast-path",
101 Self::PersistFastPath => "persist-fast-path",
102 Self::Constant => "constant",
103 }
104 }
105}
106
107#[derive(Clone, Debug)]
108pub enum StatementEndedExecutionReason {
109 Success {
110 result_size: Option<u64>,
111 rows_returned: Option<u64>,
112 execution_strategy: Option<StatementExecutionStrategy>,
113 },
114 Canceled,
115 Errored {
116 error: String,
117 },
118 Aborted,
122}
123
124#[derive(Clone, Debug)]
125pub struct StatementEndedExecutionRecord {
126 pub id: Uuid,
127 pub reason: StatementEndedExecutionReason,
128 pub ended_at: EpochMillis,
129}
130
131#[derive(Clone, Debug)]
134pub(crate) struct StatementPreparedRecord {
135 pub id: Uuid,
136 pub sql_hash: [u8; 32],
137 pub name: String,
138 pub session_id: Uuid,
139 pub prepared_at: EpochMillis,
140 pub kind: Option<StatementKind>,
141}
142
143#[derive(Clone, Debug)]
144pub(crate) struct SessionHistoryEvent {
145 pub id: Uuid,
146 pub connected_at: EpochMillis,
147 pub application_name: String,
148 pub authenticated_user: String,
149}
150
151impl From<&Result<ExecuteResponse, AdapterError>> for StatementEndedExecutionReason {
152 fn from(value: &Result<ExecuteResponse, AdapterError>) -> StatementEndedExecutionReason {
153 match value {
154 Ok(resp) => resp.into(),
155 Err(e) => StatementEndedExecutionReason::Errored {
156 error: e.to_string(),
157 },
158 }
159 }
160}
161
162impl From<&ExecuteResponse> for StatementEndedExecutionReason {
163 fn from(value: &ExecuteResponse) -> StatementEndedExecutionReason {
164 match value {
165 ExecuteResponse::CopyTo { resp, .. } => match resp.as_ref() {
166 ExecuteResponse::SendingRowsImmediate { rows, .. } => {
169 let result_size: usize = rows.box_clone().map(|row| row.byte_len()).sum();
173 StatementEndedExecutionReason::Success {
174 result_size: Some(u64::cast_from(result_size)),
175 rows_returned: Some(u64::cast_from(rows.count())),
176 execution_strategy: Some(StatementExecutionStrategy::Constant),
177 }
178 }
179 ExecuteResponse::SendingRowsStreaming { .. } => {
180 panic!("SELECTs terminate on peek finalization, not here.")
181 }
182 ExecuteResponse::Subscribing { .. } => {
183 panic!("SUBSCRIBEs terminate in the protocol layer, not here.")
184 }
185 _ => panic!("Invalid COPY response type"),
186 },
187 ExecuteResponse::CopyFrom { .. } => {
188 panic!("COPY FROMs terminate in the protocol layer, not here.")
189 }
190 ExecuteResponse::Fetch { .. } => {
191 panic!("FETCHes terminate after a follow-up message is sent.")
192 }
193 ExecuteResponse::SendingRowsStreaming { .. } => {
194 panic!("SELECTs terminate on peek finalization, not here.")
195 }
196 ExecuteResponse::Subscribing { .. } => {
197 panic!("SUBSCRIBEs terminate in the protocol layer, not here.")
198 }
199
200 ExecuteResponse::SendingRowsImmediate { rows, .. } => {
201 let result_size: usize = rows.box_clone().map(|row| row.byte_len()).sum();
207 StatementEndedExecutionReason::Success {
208 result_size: Some(u64::cast_from(result_size)),
209 rows_returned: Some(u64::cast_from(rows.count())),
210 execution_strategy: Some(StatementExecutionStrategy::Constant),
211 }
212 }
213
214 ExecuteResponse::AlteredDefaultPrivileges
215 | ExecuteResponse::AlteredObject(_)
216 | ExecuteResponse::AlteredRole
217 | ExecuteResponse::AlteredSystemConfiguration
218 | ExecuteResponse::ClosedCursor
219 | ExecuteResponse::Comment
220 | ExecuteResponse::Copied(_)
221 | ExecuteResponse::CreatedConnection
222 | ExecuteResponse::CreatedDatabase
223 | ExecuteResponse::CreatedSchema
224 | ExecuteResponse::CreatedRole
225 | ExecuteResponse::CreatedCluster
226 | ExecuteResponse::CreatedClusterReplica
227 | ExecuteResponse::CreatedIndex
228 | ExecuteResponse::CreatedIntrospectionSubscribe
229 | ExecuteResponse::CreatedSecret
230 | ExecuteResponse::CreatedSink
231 | ExecuteResponse::CreatedSource
232 | ExecuteResponse::CreatedTable
233 | ExecuteResponse::CreatedView
234 | ExecuteResponse::CreatedViews
235 | ExecuteResponse::CreatedMaterializedView
236 | ExecuteResponse::CreatedContinualTask
237 | ExecuteResponse::CreatedType
238 | ExecuteResponse::CreatedNetworkPolicy
239 | ExecuteResponse::Deallocate { .. }
240 | ExecuteResponse::DeclaredCursor
241 | ExecuteResponse::Deleted(_)
242 | ExecuteResponse::DiscardedTemp
243 | ExecuteResponse::DiscardedAll
244 | ExecuteResponse::DroppedObject(_)
245 | ExecuteResponse::DroppedOwned
246 | ExecuteResponse::EmptyQuery
247 | ExecuteResponse::GrantedPrivilege
248 | ExecuteResponse::GrantedRole
249 | ExecuteResponse::Inserted(_)
250 | ExecuteResponse::Prepare
251 | ExecuteResponse::Raised
252 | ExecuteResponse::ReassignOwned
253 | ExecuteResponse::RevokedPrivilege
254 | ExecuteResponse::RevokedRole
255 | ExecuteResponse::SetVariable { .. }
256 | ExecuteResponse::StartedTransaction
257 | ExecuteResponse::TransactionCommitted { .. }
258 | ExecuteResponse::TransactionRolledBack { .. }
259 | ExecuteResponse::Updated(_)
260 | ExecuteResponse::ValidatedConnection { .. } => {
261 StatementEndedExecutionReason::Success {
262 result_size: None,
263 rows_returned: None,
264 execution_strategy: None,
265 }
266 }
267 }
268 }
269}
270
271mod sealed {
272 #[derive(Debug, Copy, Clone)]
275 pub struct Private;
276}
277
278#[derive(Debug)]
280pub enum PreparedStatementLoggingInfo {
281 AlreadyLogged { uuid: Uuid },
285 StillToLog {
288 sql: String,
290 redacted_sql: String,
293 prepared_at: EpochMillis,
295 name: String,
297 session_id: Uuid,
299 accounted: bool,
301 kind: Option<StatementKind>,
303
304 _sealed: sealed::Private,
307 },
308}
309
310impl PreparedStatementLoggingInfo {
311 pub fn still_to_log<A: AstInfo>(
314 raw_sql: String,
315 stmt: Option<&Statement<A>>,
316 prepared_at: EpochMillis,
317 name: String,
318 session_id: Uuid,
319 accounted: bool,
320 ) -> Self {
321 let kind = stmt.map(StatementKind::from);
322 let sql = match kind {
323 Some(
328 StatementKind::CreateSecret
329 | StatementKind::AlterSecret
330 | StatementKind::Insert
331 | StatementKind::Update
332 | StatementKind::Execute,
333 ) => stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default(),
334 _ => raw_sql,
335 };
336
337 PreparedStatementLoggingInfo::StillToLog {
338 sql,
339 redacted_sql: stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default(),
340 prepared_at,
341 name,
342 session_id,
343 accounted,
344 kind,
345 _sealed: sealed::Private,
346 }
347 }
348}
349
350#[derive(Copy, Clone, Debug, Ord, Eq, PartialOrd, PartialEq)]
351pub struct StatementLoggingId(pub Uuid);
352
353#[derive(Debug, Clone)]
355pub struct PreparedStatementEvent {
356 pub prepared_statement: Row,
357 pub sql_text: Row,
358 pub session_id: Uuid,
359}
360
361#[derive(Debug)]
364pub struct ThrottlingState {
365 inner: Mutex<ThrottlingStateInner>,
373 throttled_count: std::sync::atomic::AtomicUsize,
377}
378
379#[derive(Debug)]
380struct ThrottlingStateInner {
381 tokens: u64,
385 last_logged_ts_seconds: u64,
387}
388
389impl ThrottlingState {
390 pub fn new(now: &NowFn) -> Self {
392 Self {
393 inner: Mutex::new(ThrottlingStateInner {
394 tokens: 0,
395 last_logged_ts_seconds: now() / 1000,
396 }),
397 throttled_count: std::sync::atomic::AtomicUsize::new(0),
398 }
399 }
400
401 pub fn throttling_check(
408 &self,
409 cost: u64,
410 target_data_rate: u64,
411 max_data_credit: Option<u64>,
412 now: &NowFn,
413 ) -> bool {
414 let ts = now() / 1000;
415 let mut inner = self.inner.lock().expect("throttling state lock poisoned");
416 let elapsed = ts.saturating_sub(inner.last_logged_ts_seconds);
419 inner.last_logged_ts_seconds = ts;
420 inner.tokens = inner
421 .tokens
422 .saturating_add(target_data_rate.saturating_mul(elapsed));
423 if let Some(max_data_credit) = max_data_credit {
424 inner.tokens = inner.tokens.min(max_data_credit);
425 }
426 if let Some(remaining) = inner.tokens.checked_sub(cost) {
427 tracing::debug!("throttling check passed. tokens remaining: {remaining}; cost: {cost}");
428 inner.tokens = remaining;
429 true
430 } else {
431 tracing::debug!(
432 "throttling check failed. tokens available: {}; cost: {cost}",
433 inner.tokens
434 );
435 false
436 }
437 }
438
439 pub fn get_throttled_count(&self) -> usize {
440 self.throttled_count.load(Ordering::Relaxed)
441 }
442
443 pub fn increment_throttled_count(&self) {
444 self.throttled_count.fetch_add(1, Ordering::Relaxed);
445 }
446
447 pub fn reset_throttled_count(&self) {
448 self.throttled_count.store(0, Ordering::Relaxed);
449 }
450}
451
452#[derive(Debug, Clone)]
458pub struct StatementLoggingFrontend {
459 pub throttling_state: Arc<ThrottlingState>,
461 pub reproducible_rng: Arc<Mutex<rand_chacha::ChaCha8Rng>>,
463 pub build_info_human_version: String,
465 pub now: NowFn,
467}
468
469impl StatementLoggingFrontend {
470 fn get_prepared_statement_info(
486 &self,
487 session: &mut Session,
488 logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
489 ) -> (Option<PreparedStatementEvent>, Uuid) {
490 let logging_ref = session.qcell_rw(&*logging);
491 let mut prepared_statement_event = None;
492
493 let ps_uuid = match logging_ref {
494 PreparedStatementLoggingInfo::AlreadyLogged { uuid } => *uuid,
495 PreparedStatementLoggingInfo::StillToLog {
496 sql,
497 redacted_sql,
498 prepared_at,
499 name,
500 session_id,
501 accounted,
502 kind,
503 _sealed: _,
504 } => {
505 assert!(
506 *accounted,
507 "accounting for logging should be done in `begin_statement_execution`"
508 );
509 let uuid = epoch_to_uuid_v7(prepared_at);
510 let sql = std::mem::take(sql);
511 let redacted_sql = std::mem::take(redacted_sql);
512 let sql_hash: [u8; 32] = Sha256::digest(sql.as_bytes()).into();
513
514 let sid = *session_id;
516
517 let record = StatementPreparedRecord {
518 id: uuid,
519 sql_hash,
520 name: std::mem::take(name),
521 session_id: sid,
522 prepared_at: *prepared_at,
523 kind: *kind,
524 };
525
526 let mut mpsh_row = Row::default();
528 let mut mpsh_packer = mpsh_row.packer();
529 pack_statement_prepared_update(&record, &mut mpsh_packer);
530
531 let sql_row = Row::pack([
532 Datum::TimestampTz(
533 to_datetime(*prepared_at)
534 .truncate_day()
535 .try_into()
536 .expect("must fit"),
537 ),
538 Datum::Bytes(sql_hash.as_slice()),
539 Datum::String(sql.as_str()),
540 Datum::String(redacted_sql.as_str()),
541 ]);
542
543 let throttled_count = self.throttling_state.get_throttled_count();
545
546 mpsh_packer.push(Datum::UInt64(CastFrom::cast_from(throttled_count)));
547
548 prepared_statement_event = Some(PreparedStatementEvent {
549 prepared_statement: mpsh_row,
550 sql_text: sql_row,
551 session_id: sid,
552 });
553
554 *logging_ref = PreparedStatementLoggingInfo::AlreadyLogged { uuid };
555 uuid
556 }
557 };
558
559 (prepared_statement_event, ps_uuid)
560 }
561
562 pub fn begin_statement_execution(
577 &self,
578 session: &mut Session,
579 params: &Params,
580 logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
581 system_config: &SystemVars,
582 lifecycle_timestamps: Option<LifecycleTimestamps>,
583 ) -> Option<(
584 StatementLoggingId,
585 StatementBeganExecutionRecord,
586 Row,
587 Option<PreparedStatementEvent>,
588 )> {
589 let enable_internal_statement_logging = system_config.enable_internal_statement_logging();
591 if session.user().is_internal() && !enable_internal_statement_logging {
592 return None;
593 }
594
595 let sample_rate = effective_sample_rate(session, system_config);
596
597 let use_reproducible_rng = system_config.statement_logging_use_reproducible_rng();
598 let target_data_rate: Option<u64> = system_config
599 .statement_logging_target_data_rate()
600 .map(|rate| rate.cast_into());
601 let max_data_credit: Option<u64> = system_config
602 .statement_logging_max_data_credit()
603 .map(|credit| credit.cast_into());
604
605 let sample = if use_reproducible_rng {
607 let mut rng = self.reproducible_rng.lock().expect("rng lock poisoned");
608 should_sample_statement(sample_rate, Some(&mut *rng))
609 } else {
610 should_sample_statement(sample_rate, None)
611 };
612
613 let sampled_label = sample.then_some("true").unwrap_or("false");
614 session
615 .metrics()
616 .statement_logging_records(&[sampled_label])
617 .inc_by(1);
618
619 let unsampled_bytes_metric = session
621 .metrics()
622 .statement_logging_unsampled_bytes()
623 .clone();
624 let actual_bytes_metric = session.metrics().statement_logging_actual_bytes().clone();
625
626 let is_new_prepared_statement = if let Some((sql, accounted)) =
628 match session.qcell_rw(logging) {
629 PreparedStatementLoggingInfo::AlreadyLogged { .. } => None,
630 PreparedStatementLoggingInfo::StillToLog { sql, accounted, .. } => {
631 Some((sql, accounted))
632 }
633 } {
634 if !*accounted {
635 unsampled_bytes_metric.inc_by(u64::cast_from(sql.len()));
636 if sample {
637 actual_bytes_metric.inc_by(u64::cast_from(sql.len()));
638 }
639 *accounted = true;
640 }
641 true
642 } else {
643 false
644 };
645
646 if !sample {
647 return None;
648 }
649
650 let (prepared_statement_event, ps_uuid) =
652 self.get_prepared_statement_info(session, logging);
653
654 let began_at = if let Some(lifecycle_timestamps) = lifecycle_timestamps {
655 lifecycle_timestamps.received
656 } else {
657 (self.now)()
658 };
659
660 let current_time = (self.now)();
661 let execution_uuid = epoch_to_uuid_v7(¤t_time);
662
663 let began_execution = create_began_execution_record(
665 execution_uuid,
666 ps_uuid,
667 sample_rate,
668 params,
669 session,
670 began_at,
671 self.build_info_human_version.clone(),
672 );
673
674 let mseh_update = pack_statement_began_execution_update(&began_execution);
676 let maybe_ps_prepared_statement = prepared_statement_event
677 .as_ref()
678 .map(|e| &e.prepared_statement);
679 let maybe_ps_sql_text = prepared_statement_event.as_ref().map(|e| &e.sql_text);
680
681 let cost: usize = [
683 Some(&mseh_update),
684 maybe_ps_prepared_statement,
685 maybe_ps_sql_text,
686 ]
687 .into_iter()
688 .filter_map(|row_opt| row_opt.map(|row| row.byte_len()))
689 .fold(0_usize, |acc, x| acc.saturating_add(x));
690
691 let passed = if let Some(target_data_rate) = target_data_rate {
693 self.throttling_state.throttling_check(
694 cost.cast_into(),
695 target_data_rate,
696 max_data_credit,
697 &self.now,
698 )
699 } else {
700 true };
702
703 if !passed {
704 self.throttling_state.increment_throttled_count();
706 return None;
707 }
708
709 if is_new_prepared_statement {
712 self.throttling_state.reset_throttled_count();
713 }
714
715 Some((
716 StatementLoggingId(execution_uuid),
717 began_execution,
718 mseh_update,
719 prepared_statement_event,
720 ))
721 }
722}
723
724pub(crate) fn effective_sample_rate(session: &Session, system_vars: &SystemVars) -> f64 {
728 let system_max: f64 = system_vars
729 .statement_logging_max_sample_rate()
730 .try_into()
731 .expect("value constrained to be convertible to f64");
732 let user_rate: f64 = session
733 .vars()
734 .get_statement_logging_sample_rate()
735 .try_into()
736 .expect("value constrained to be convertible to f64");
737 f64::min(system_max, user_rate)
738}
739
740pub(crate) fn should_sample_statement(
746 sample_rate: f64,
747 reproducible_rng: Option<&mut rand_chacha::ChaCha8Rng>,
748) -> bool {
749 let distribution = Bernoulli::new(sample_rate).unwrap_or_else(|_| {
750 soft_panic_or_log!("statement_logging_sample_rate is out of range [0, 1]");
751 Bernoulli::new(0.0).expect("0.0 is valid for Bernoulli")
752 });
753 if let Some(rng) = reproducible_rng {
754 distribution.sample(rng)
755 } else {
756 distribution.sample(&mut rand::rng())
757 }
758}
759
760fn serialize_params(params: &Params) -> Vec<Option<String>> {
762 std::iter::zip(params.execute_types.iter(), params.datums.iter())
763 .map(|(r#type, datum)| {
764 mz_pgrepr::Value::from_datum(datum, r#type).map(|val| {
765 let mut buf = BytesMut::new();
766 val.encode_text(&mut buf);
767 String::from_utf8(Into::<Vec<u8>>::into(buf))
768 .expect("Serialization shouldn't produce non-UTF-8 strings.")
769 })
770 })
771 .collect()
772}
773
774pub(crate) fn create_began_execution_record(
776 execution_uuid: Uuid,
777 prepared_statement_uuid: Uuid,
778 sample_rate: f64,
779 params: &Params,
780 session: &Session,
781 began_at: EpochMillis,
782 build_info_version: String,
783) -> StatementBeganExecutionRecord {
784 let params = serialize_params(params);
785 StatementBeganExecutionRecord {
786 id: execution_uuid,
787 prepared_statement_id: prepared_statement_uuid,
788 sample_rate,
789 params,
790 began_at,
791 application_name: session.application_name().to_string(),
792 transaction_isolation: session.vars().transaction_isolation().to_string(),
793 transaction_id: session
794 .transaction()
795 .inner()
796 .map(|t| t.id)
797 .unwrap_or_else(|| {
798 soft_panic_or_log!(
801 "Statement logging got a statement with no associated transaction"
802 );
803 9999999
804 }),
805 mz_version: build_info_version,
806 cluster_id: None,
808 cluster_name: None,
809 execution_timestamp: None,
810 transient_index_id: None,
811 database_name: session.vars().database().into(),
812 search_path: session
813 .vars()
814 .search_path()
815 .iter()
816 .map(|s| s.as_str().to_string())
817 .collect(),
818 }
819}
820
821#[derive(Debug, Clone)]
824pub enum FrontendStatementLoggingEvent {
825 BeganExecution {
828 record: StatementBeganExecutionRecord,
829 mseh_update: Row,
831 prepared_statement: Option<PreparedStatementEvent>,
832 },
833 EndedExecution(StatementEndedExecutionRecord),
835 SetCluster {
837 id: StatementLoggingId,
838 cluster_id: ClusterId,
839 },
840 SetTimestamp {
842 id: StatementLoggingId,
843 timestamp: Timestamp,
844 },
845 SetTransientIndex {
847 id: StatementLoggingId,
848 transient_index_id: GlobalId,
849 },
850 Lifecycle {
852 id: StatementLoggingId,
853 event: StatementLifecycleEvent,
854 when: EpochMillis,
855 },
856}
857
858pub(crate) fn pack_statement_execution_inner(
859 record: &StatementBeganExecutionRecord,
860 packer: &mut RowPacker,
861) {
862 let StatementBeganExecutionRecord {
863 id,
864 prepared_statement_id,
865 sample_rate,
866 params,
867 began_at,
868 cluster_id,
869 cluster_name,
870 database_name,
871 search_path,
872 application_name,
873 transaction_isolation,
874 execution_timestamp,
875 transaction_id,
876 transient_index_id,
877 mz_version,
878 } = record;
879
880 let cluster = cluster_id.map(|id| id.to_string());
881 let transient_index_id = transient_index_id.map(|id| id.to_string());
882 packer.extend([
883 Datum::Uuid(*id),
884 Datum::Uuid(*prepared_statement_id),
885 Datum::Float64((*sample_rate).into()),
886 match &cluster {
887 None => Datum::Null,
888 Some(cluster_id) => Datum::String(cluster_id),
889 },
890 Datum::String(&*application_name),
891 cluster_name.as_ref().map(String::as_str).into(),
892 Datum::String(database_name),
893 ]);
894 packer.push_list(search_path.iter().map(|s| Datum::String(s)));
895 packer.extend([
896 Datum::String(&*transaction_isolation),
897 (*execution_timestamp).into(),
898 Datum::UInt64(*transaction_id),
899 match &transient_index_id {
900 None => Datum::Null,
901 Some(transient_index_id) => Datum::String(transient_index_id),
902 },
903 ]);
904 packer
905 .try_push_array(
906 &[ArrayDimension {
907 lower_bound: 1,
908 length: params.len(),
909 }],
910 params
911 .iter()
912 .map(|p| Datum::from(p.as_ref().map(String::as_str))),
913 )
914 .expect("correct array dimensions");
915 packer.push(Datum::from(mz_version.as_str()));
916 packer.push(Datum::TimestampTz(
917 to_datetime(*began_at).try_into().expect("Sane system time"),
918 ));
919}
920
921pub(crate) fn pack_statement_began_execution_update(record: &StatementBeganExecutionRecord) -> Row {
922 let mut row = Row::default();
923 let mut packer = row.packer();
924 pack_statement_execution_inner(record, &mut packer);
925 packer.extend([
926 Datum::Null,
928 Datum::Null,
930 Datum::Null,
932 Datum::Null,
934 Datum::Null,
936 Datum::Null,
938 ]);
939 row
940}
941
942pub(crate) fn pack_statement_prepared_update(
943 record: &StatementPreparedRecord,
944 packer: &mut RowPacker,
945) {
946 let StatementPreparedRecord {
947 id,
948 session_id,
949 name,
950 sql_hash,
951 prepared_at,
952 kind,
953 } = record;
954 packer.extend([
955 Datum::Uuid(*id),
956 Datum::Uuid(*session_id),
957 Datum::String(name.as_str()),
958 Datum::Bytes(sql_hash.as_slice()),
959 Datum::TimestampTz(to_datetime(*prepared_at).try_into().expect("must fit")),
960 kind.map(statement_kind_label_value).into(),
961 ]);
962}
963
964#[derive(Debug)]
967pub struct WatchSetCreation {
968 pub logging_id: StatementLoggingId,
970 pub timestamp: Timestamp,
972 pub storage_ids: BTreeSet<GlobalId>,
974 pub compute_ids: BTreeSet<GlobalId>,
976}
977
978impl WatchSetCreation {
979 pub fn new(
982 logging_id: StatementLoggingId,
983 catalog_state: &CatalogState,
984 input_id_bundle: &CollectionIdBundle,
985 timestamp: Timestamp,
986 ) -> Self {
987 let mut storage_ids = BTreeSet::new();
988 let mut compute_ids = BTreeSet::new();
989
990 for item_id in input_id_bundle
991 .iter()
992 .map(|gid| catalog_state.get_entry_by_global_id(&gid).id())
993 .flat_map(|id| catalog_state.transitive_uses(id))
994 {
995 let entry = catalog_state.get_entry(&item_id);
996 match entry.item() {
997 CatalogItem::Table(_) | CatalogItem::Source(_) => {
1002 storage_ids.extend(entry.global_ids());
1003 }
1004 CatalogItem::MaterializedView(_) | CatalogItem::Index(_) => {
1007 compute_ids.insert(entry.latest_global_id());
1008 }
1009 _ => {}
1010 }
1011 }
1012
1013 Self {
1014 logging_id,
1015 timestamp,
1016 storage_ids,
1017 compute_ids,
1018 }
1019 }
1020}