1use std::collections::BTreeSet;
11use std::sync::atomic::Ordering;
12use std::sync::{Arc, Mutex};
13
14use bytes::BytesMut;
15use mz_catalog::memory::objects::CatalogItem;
16use mz_controller_types::ClusterId;
17use mz_ore::cast::{CastFrom, CastInto};
18use mz_ore::now::{EpochMillis, NowFn, epoch_to_uuid_v7, to_datetime};
19use mz_ore::soft_panic_or_log;
20use mz_repr::adt::array::ArrayDimension;
21use mz_repr::adt::timestamp::TimestampLike;
22use mz_repr::{Datum, GlobalId, Row, RowIterator, RowPacker, Timestamp};
23use mz_sql::ast::display::AstDisplay;
24use mz_sql::ast::{AstInfo, Statement};
25use mz_sql::plan::Params;
26use mz_sql::session::metadata::SessionMetadata;
27use mz_sql::session::vars::SystemVars;
28use mz_sql_parser::ast::{StatementKind, statement_kind_label_value};
29use qcell::QCell;
30use rand::distr::{Bernoulli, Distribution};
31use sha2::{Digest, Sha256};
32use uuid::Uuid;
33
34use crate::catalog::CatalogState;
35use crate::session::{LifecycleTimestamps, Session, TransactionId};
36use crate::{AdapterError, CollectionIdBundle, ExecuteResponse};
37
38#[derive(Clone, Debug)]
39pub enum StatementLifecycleEvent {
40 ExecutionBegan,
41 OptimizationFinished,
42 StorageDependenciesFinished,
43 ComputeDependenciesFinished,
44 ExecutionFinished,
45}
46
47impl StatementLifecycleEvent {
48 pub fn as_str(&self) -> &str {
49 match self {
50 Self::ExecutionBegan => "execution-began",
51 Self::OptimizationFinished => "optimization-finished",
52 Self::StorageDependenciesFinished => "storage-dependencies-finished",
53 Self::ComputeDependenciesFinished => "compute-dependencies-finished",
54 Self::ExecutionFinished => "execution-finished",
55 }
56 }
57}
58
59#[derive(Clone, Debug)]
63pub struct StatementBeganExecutionRecord {
64 pub id: Uuid,
65 pub prepared_statement_id: Uuid,
66 pub sample_rate: f64,
67 pub params: Vec<Option<String>>,
68 pub began_at: EpochMillis,
69 pub cluster_id: Option<ClusterId>,
70 pub cluster_name: Option<String>,
71 pub database_name: String,
72 pub search_path: Vec<String>,
73 pub application_name: String,
74 pub transaction_isolation: String,
75 pub execution_timestamp: Option<EpochMillis>,
76 pub transaction_id: TransactionId,
77 pub transient_index_id: Option<GlobalId>,
78 pub mz_version: String,
79}
80
81#[derive(Clone, Copy, Debug)]
82pub enum StatementExecutionStrategy {
83 Standard,
85 FastPath,
88 PersistFastPath,
91 Constant,
94}
95
96impl StatementExecutionStrategy {
97 pub fn name(&self) -> &'static str {
98 match self {
99 Self::Standard => "standard",
100 Self::FastPath => "fast-path",
101 Self::PersistFastPath => "persist-fast-path",
102 Self::Constant => "constant",
103 }
104 }
105}
106
107#[derive(Clone, Debug)]
108pub enum StatementEndedExecutionReason {
109 Success {
110 result_size: Option<u64>,
111 rows_returned: Option<u64>,
112 execution_strategy: Option<StatementExecutionStrategy>,
113 },
114 Canceled,
115 Errored {
116 error: String,
117 },
118 Aborted,
119}
120
121#[derive(Clone, Debug)]
122pub struct StatementEndedExecutionRecord {
123 pub id: Uuid,
124 pub reason: StatementEndedExecutionReason,
125 pub ended_at: EpochMillis,
126}
127
128#[derive(Clone, Debug)]
131pub(crate) struct StatementPreparedRecord {
132 pub id: Uuid,
133 pub sql_hash: [u8; 32],
134 pub name: String,
135 pub session_id: Uuid,
136 pub prepared_at: EpochMillis,
137 pub kind: Option<StatementKind>,
138}
139
140#[derive(Clone, Debug)]
141pub(crate) struct SessionHistoryEvent {
142 pub id: Uuid,
143 pub connected_at: EpochMillis,
144 pub application_name: String,
145 pub authenticated_user: String,
146}
147
148impl From<&Result<ExecuteResponse, AdapterError>> for StatementEndedExecutionReason {
149 fn from(value: &Result<ExecuteResponse, AdapterError>) -> StatementEndedExecutionReason {
150 match value {
151 Ok(resp) => resp.into(),
152 Err(e) => StatementEndedExecutionReason::Errored {
153 error: e.to_string(),
154 },
155 }
156 }
157}
158
159impl From<&ExecuteResponse> for StatementEndedExecutionReason {
160 fn from(value: &ExecuteResponse) -> StatementEndedExecutionReason {
161 match value {
162 ExecuteResponse::CopyTo { resp, .. } => match resp.as_ref() {
163 ExecuteResponse::SendingRowsImmediate { rows, .. } => {
166 let result_size: usize = rows.box_clone().map(|row| row.byte_len()).sum();
170 StatementEndedExecutionReason::Success {
171 result_size: Some(u64::cast_from(result_size)),
172 rows_returned: Some(u64::cast_from(rows.count())),
173 execution_strategy: Some(StatementExecutionStrategy::Constant),
174 }
175 }
176 ExecuteResponse::SendingRowsStreaming { .. } => {
177 panic!("SELECTs terminate on peek finalization, not here.")
178 }
179 ExecuteResponse::Subscribing { .. } => {
180 panic!("SUBSCRIBEs terminate in the protocol layer, not here.")
181 }
182 _ => panic!("Invalid COPY response type"),
183 },
184 ExecuteResponse::CopyFrom { .. } => {
185 panic!("COPY FROMs terminate in the protocol layer, not here.")
186 }
187 ExecuteResponse::Fetch { .. } => {
188 panic!("FETCHes terminate after a follow-up message is sent.")
189 }
190 ExecuteResponse::SendingRowsStreaming { .. } => {
191 panic!("SELECTs terminate on peek finalization, not here.")
192 }
193 ExecuteResponse::Subscribing { .. } => {
194 panic!("SUBSCRIBEs terminate in the protocol layer, not here.")
195 }
196
197 ExecuteResponse::SendingRowsImmediate { rows, .. } => {
198 let result_size: usize = rows.box_clone().map(|row| row.byte_len()).sum();
204 StatementEndedExecutionReason::Success {
205 result_size: Some(u64::cast_from(result_size)),
206 rows_returned: Some(u64::cast_from(rows.count())),
207 execution_strategy: Some(StatementExecutionStrategy::Constant),
208 }
209 }
210
211 ExecuteResponse::AlteredDefaultPrivileges
212 | ExecuteResponse::AlteredObject(_)
213 | ExecuteResponse::AlteredRole
214 | ExecuteResponse::AlteredSystemConfiguration
215 | ExecuteResponse::ClosedCursor
216 | ExecuteResponse::Comment
217 | ExecuteResponse::Copied(_)
218 | ExecuteResponse::CreatedConnection
219 | ExecuteResponse::CreatedDatabase
220 | ExecuteResponse::CreatedSchema
221 | ExecuteResponse::CreatedRole
222 | ExecuteResponse::CreatedCluster
223 | ExecuteResponse::CreatedClusterReplica
224 | ExecuteResponse::CreatedIndex
225 | ExecuteResponse::CreatedIntrospectionSubscribe
226 | ExecuteResponse::CreatedSecret
227 | ExecuteResponse::CreatedSink
228 | ExecuteResponse::CreatedSource
229 | ExecuteResponse::CreatedTable
230 | ExecuteResponse::CreatedView
231 | ExecuteResponse::CreatedViews
232 | ExecuteResponse::CreatedMaterializedView
233 | ExecuteResponse::CreatedContinualTask
234 | ExecuteResponse::CreatedType
235 | ExecuteResponse::CreatedNetworkPolicy
236 | ExecuteResponse::Deallocate { .. }
237 | ExecuteResponse::DeclaredCursor
238 | ExecuteResponse::Deleted(_)
239 | ExecuteResponse::DiscardedTemp
240 | ExecuteResponse::DiscardedAll
241 | ExecuteResponse::DroppedObject(_)
242 | ExecuteResponse::DroppedOwned
243 | ExecuteResponse::EmptyQuery
244 | ExecuteResponse::GrantedPrivilege
245 | ExecuteResponse::GrantedRole
246 | ExecuteResponse::Inserted(_)
247 | ExecuteResponse::Prepare
248 | ExecuteResponse::Raised
249 | ExecuteResponse::ReassignOwned
250 | ExecuteResponse::RevokedPrivilege
251 | ExecuteResponse::RevokedRole
252 | ExecuteResponse::SetVariable { .. }
253 | ExecuteResponse::StartedTransaction
254 | ExecuteResponse::TransactionCommitted { .. }
255 | ExecuteResponse::TransactionRolledBack { .. }
256 | ExecuteResponse::Updated(_)
257 | ExecuteResponse::ValidatedConnection { .. } => {
258 StatementEndedExecutionReason::Success {
259 result_size: None,
260 rows_returned: None,
261 execution_strategy: None,
262 }
263 }
264 }
265 }
266}
267
268mod sealed {
269 #[derive(Debug, Copy, Clone)]
272 pub struct Private;
273}
274
275#[derive(Debug)]
277pub enum PreparedStatementLoggingInfo {
278 AlreadyLogged { uuid: Uuid },
282 StillToLog {
285 sql: String,
287 redacted_sql: String,
290 prepared_at: EpochMillis,
292 name: String,
294 session_id: Uuid,
296 accounted: bool,
298 kind: Option<StatementKind>,
300
301 _sealed: sealed::Private,
304 },
305}
306
307impl PreparedStatementLoggingInfo {
308 pub fn still_to_log<A: AstInfo>(
311 raw_sql: String,
312 stmt: Option<&Statement<A>>,
313 prepared_at: EpochMillis,
314 name: String,
315 session_id: Uuid,
316 accounted: bool,
317 ) -> Self {
318 let kind = stmt.map(StatementKind::from);
319 let sql = match kind {
320 Some(
325 StatementKind::CreateSecret
326 | StatementKind::AlterSecret
327 | StatementKind::Insert
328 | StatementKind::Update
329 | StatementKind::Execute,
330 ) => stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default(),
331 _ => raw_sql,
332 };
333
334 PreparedStatementLoggingInfo::StillToLog {
335 sql,
336 redacted_sql: stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default(),
337 prepared_at,
338 name,
339 session_id,
340 accounted,
341 kind,
342 _sealed: sealed::Private,
343 }
344 }
345}
346
347#[derive(Copy, Clone, Debug, Ord, Eq, PartialOrd, PartialEq)]
348pub struct StatementLoggingId(pub Uuid);
349
350#[derive(Debug, Clone)]
352pub struct PreparedStatementEvent {
353 pub prepared_statement: Row,
354 pub sql_text: Row,
355 pub session_id: Uuid,
356}
357
358#[derive(Debug)]
361pub struct ThrottlingState {
362 inner: Mutex<ThrottlingStateInner>,
370 throttled_count: std::sync::atomic::AtomicUsize,
374}
375
376#[derive(Debug)]
377struct ThrottlingStateInner {
378 tokens: u64,
382 last_logged_ts_seconds: u64,
384}
385
386impl ThrottlingState {
387 pub fn new(now: &NowFn) -> Self {
389 Self {
390 inner: Mutex::new(ThrottlingStateInner {
391 tokens: 0,
392 last_logged_ts_seconds: now() / 1000,
393 }),
394 throttled_count: std::sync::atomic::AtomicUsize::new(0),
395 }
396 }
397
398 pub fn throttling_check(
405 &self,
406 cost: u64,
407 target_data_rate: u64,
408 max_data_credit: Option<u64>,
409 now: &NowFn,
410 ) -> bool {
411 let ts = now() / 1000;
412 let mut inner = self.inner.lock().expect("throttling state lock poisoned");
413 let elapsed = ts.saturating_sub(inner.last_logged_ts_seconds);
416 inner.last_logged_ts_seconds = ts;
417 inner.tokens = inner
418 .tokens
419 .saturating_add(target_data_rate.saturating_mul(elapsed));
420 if let Some(max_data_credit) = max_data_credit {
421 inner.tokens = inner.tokens.min(max_data_credit);
422 }
423 if let Some(remaining) = inner.tokens.checked_sub(cost) {
424 tracing::debug!("throttling check passed. tokens remaining: {remaining}; cost: {cost}");
425 inner.tokens = remaining;
426 true
427 } else {
428 tracing::debug!(
429 "throttling check failed. tokens available: {}; cost: {cost}",
430 inner.tokens
431 );
432 false
433 }
434 }
435
436 pub fn get_throttled_count(&self) -> usize {
437 self.throttled_count.load(Ordering::Relaxed)
438 }
439
440 pub fn increment_throttled_count(&self) {
441 self.throttled_count.fetch_add(1, Ordering::Relaxed);
442 }
443
444 pub fn reset_throttled_count(&self) {
445 self.throttled_count.store(0, Ordering::Relaxed);
446 }
447}
448
449#[derive(Debug, Clone)]
455pub struct StatementLoggingFrontend {
456 pub throttling_state: Arc<ThrottlingState>,
458 pub reproducible_rng: Arc<Mutex<rand_chacha::ChaCha8Rng>>,
460 pub build_info_human_version: String,
462 pub now: NowFn,
464}
465
466impl StatementLoggingFrontend {
467 fn get_prepared_statement_info(
483 &self,
484 session: &mut Session,
485 logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
486 ) -> (Option<PreparedStatementEvent>, Uuid) {
487 let logging_ref = session.qcell_rw(&*logging);
488 let mut prepared_statement_event = None;
489
490 let ps_uuid = match logging_ref {
491 PreparedStatementLoggingInfo::AlreadyLogged { uuid } => *uuid,
492 PreparedStatementLoggingInfo::StillToLog {
493 sql,
494 redacted_sql,
495 prepared_at,
496 name,
497 session_id,
498 accounted,
499 kind,
500 _sealed: _,
501 } => {
502 assert!(
503 *accounted,
504 "accounting for logging should be done in `begin_statement_execution`"
505 );
506 let uuid = epoch_to_uuid_v7(prepared_at);
507 let sql = std::mem::take(sql);
508 let redacted_sql = std::mem::take(redacted_sql);
509 let sql_hash: [u8; 32] = Sha256::digest(sql.as_bytes()).into();
510
511 let sid = *session_id;
513
514 let record = StatementPreparedRecord {
515 id: uuid,
516 sql_hash,
517 name: std::mem::take(name),
518 session_id: sid,
519 prepared_at: *prepared_at,
520 kind: *kind,
521 };
522
523 let mut mpsh_row = Row::default();
525 let mut mpsh_packer = mpsh_row.packer();
526 pack_statement_prepared_update(&record, &mut mpsh_packer);
527
528 let sql_row = Row::pack([
529 Datum::TimestampTz(
530 to_datetime(*prepared_at)
531 .truncate_day()
532 .try_into()
533 .expect("must fit"),
534 ),
535 Datum::Bytes(sql_hash.as_slice()),
536 Datum::String(sql.as_str()),
537 Datum::String(redacted_sql.as_str()),
538 ]);
539
540 let throttled_count = self.throttling_state.get_throttled_count();
542
543 mpsh_packer.push(Datum::UInt64(CastFrom::cast_from(throttled_count)));
544
545 prepared_statement_event = Some(PreparedStatementEvent {
546 prepared_statement: mpsh_row,
547 sql_text: sql_row,
548 session_id: sid,
549 });
550
551 *logging_ref = PreparedStatementLoggingInfo::AlreadyLogged { uuid };
552 uuid
553 }
554 };
555
556 (prepared_statement_event, ps_uuid)
557 }
558
559 pub fn begin_statement_execution(
574 &self,
575 session: &mut Session,
576 params: &Params,
577 logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
578 system_config: &SystemVars,
579 lifecycle_timestamps: Option<LifecycleTimestamps>,
580 ) -> Option<(
581 StatementLoggingId,
582 StatementBeganExecutionRecord,
583 Row,
584 Option<PreparedStatementEvent>,
585 )> {
586 let enable_internal_statement_logging = system_config.enable_internal_statement_logging();
588 if session.user().is_internal() && !enable_internal_statement_logging {
589 return None;
590 }
591
592 let sample_rate = effective_sample_rate(session, system_config);
593
594 let use_reproducible_rng = system_config.statement_logging_use_reproducible_rng();
595 let target_data_rate: Option<u64> = system_config
596 .statement_logging_target_data_rate()
597 .map(|rate| rate.cast_into());
598 let max_data_credit: Option<u64> = system_config
599 .statement_logging_max_data_credit()
600 .map(|credit| credit.cast_into());
601
602 let sample = if use_reproducible_rng {
604 let mut rng = self.reproducible_rng.lock().expect("rng lock poisoned");
605 should_sample_statement(sample_rate, Some(&mut *rng))
606 } else {
607 should_sample_statement(sample_rate, None)
608 };
609
610 let sampled_label = sample.then_some("true").unwrap_or("false");
611 session
612 .metrics()
613 .statement_logging_records(&[sampled_label])
614 .inc_by(1);
615
616 let unsampled_bytes_metric = session
618 .metrics()
619 .statement_logging_unsampled_bytes()
620 .clone();
621 let actual_bytes_metric = session.metrics().statement_logging_actual_bytes().clone();
622
623 let is_new_prepared_statement = if let Some((sql, accounted)) =
625 match session.qcell_rw(logging) {
626 PreparedStatementLoggingInfo::AlreadyLogged { .. } => None,
627 PreparedStatementLoggingInfo::StillToLog { sql, accounted, .. } => {
628 Some((sql, accounted))
629 }
630 } {
631 if !*accounted {
632 unsampled_bytes_metric.inc_by(u64::cast_from(sql.len()));
633 if sample {
634 actual_bytes_metric.inc_by(u64::cast_from(sql.len()));
635 }
636 *accounted = true;
637 }
638 true
639 } else {
640 false
641 };
642
643 if !sample {
644 return None;
645 }
646
647 let (prepared_statement_event, ps_uuid) =
649 self.get_prepared_statement_info(session, logging);
650
651 let began_at = if let Some(lifecycle_timestamps) = lifecycle_timestamps {
652 lifecycle_timestamps.received
653 } else {
654 (self.now)()
655 };
656
657 let current_time = (self.now)();
658 let execution_uuid = epoch_to_uuid_v7(¤t_time);
659
660 let began_execution = create_began_execution_record(
662 execution_uuid,
663 ps_uuid,
664 sample_rate,
665 params,
666 session,
667 began_at,
668 self.build_info_human_version.clone(),
669 );
670
671 let mseh_update = pack_statement_began_execution_update(&began_execution);
673 let maybe_ps_prepared_statement = prepared_statement_event
674 .as_ref()
675 .map(|e| &e.prepared_statement);
676 let maybe_ps_sql_text = prepared_statement_event.as_ref().map(|e| &e.sql_text);
677
678 let cost: usize = [
680 Some(&mseh_update),
681 maybe_ps_prepared_statement,
682 maybe_ps_sql_text,
683 ]
684 .into_iter()
685 .filter_map(|row_opt| row_opt.map(|row| row.byte_len()))
686 .fold(0_usize, |acc, x| acc.saturating_add(x));
687
688 let passed = if let Some(target_data_rate) = target_data_rate {
690 self.throttling_state.throttling_check(
691 cost.cast_into(),
692 target_data_rate,
693 max_data_credit,
694 &self.now,
695 )
696 } else {
697 true };
699
700 if !passed {
701 self.throttling_state.increment_throttled_count();
703 return None;
704 }
705
706 if is_new_prepared_statement {
709 self.throttling_state.reset_throttled_count();
710 }
711
712 Some((
713 StatementLoggingId(execution_uuid),
714 began_execution,
715 mseh_update,
716 prepared_statement_event,
717 ))
718 }
719}
720
721pub(crate) fn effective_sample_rate(session: &Session, system_vars: &SystemVars) -> f64 {
725 let system_max: f64 = system_vars
726 .statement_logging_max_sample_rate()
727 .try_into()
728 .expect("value constrained to be convertible to f64");
729 let user_rate: f64 = session
730 .vars()
731 .get_statement_logging_sample_rate()
732 .try_into()
733 .expect("value constrained to be convertible to f64");
734 f64::min(system_max, user_rate)
735}
736
737pub(crate) fn should_sample_statement(
743 sample_rate: f64,
744 reproducible_rng: Option<&mut rand_chacha::ChaCha8Rng>,
745) -> bool {
746 let distribution = Bernoulli::new(sample_rate).unwrap_or_else(|_| {
747 soft_panic_or_log!("statement_logging_sample_rate is out of range [0, 1]");
748 Bernoulli::new(0.0).expect("0.0 is valid for Bernoulli")
749 });
750 if let Some(rng) = reproducible_rng {
751 distribution.sample(rng)
752 } else {
753 distribution.sample(&mut rand::rng())
754 }
755}
756
757fn serialize_params(params: &Params) -> Vec<Option<String>> {
759 std::iter::zip(params.execute_types.iter(), params.datums.iter())
760 .map(|(r#type, datum)| {
761 mz_pgrepr::Value::from_datum(datum, r#type).map(|val| {
762 let mut buf = BytesMut::new();
763 val.encode_text(&mut buf);
764 String::from_utf8(Into::<Vec<u8>>::into(buf))
765 .expect("Serialization shouldn't produce non-UTF-8 strings.")
766 })
767 })
768 .collect()
769}
770
771pub(crate) fn create_began_execution_record(
773 execution_uuid: Uuid,
774 prepared_statement_uuid: Uuid,
775 sample_rate: f64,
776 params: &Params,
777 session: &Session,
778 began_at: EpochMillis,
779 build_info_version: String,
780) -> StatementBeganExecutionRecord {
781 let params = serialize_params(params);
782 StatementBeganExecutionRecord {
783 id: execution_uuid,
784 prepared_statement_id: prepared_statement_uuid,
785 sample_rate,
786 params,
787 began_at,
788 application_name: session.application_name().to_string(),
789 transaction_isolation: session.vars().transaction_isolation().to_string(),
790 transaction_id: session
791 .transaction()
792 .inner()
793 .map(|t| t.id)
794 .unwrap_or_else(|| {
795 soft_panic_or_log!(
798 "Statement logging got a statement with no associated transaction"
799 );
800 9999999
801 }),
802 mz_version: build_info_version,
803 cluster_id: None,
805 cluster_name: None,
806 execution_timestamp: None,
807 transient_index_id: None,
808 database_name: session.vars().database().into(),
809 search_path: session
810 .vars()
811 .search_path()
812 .iter()
813 .map(|s| s.as_str().to_string())
814 .collect(),
815 }
816}
817
818#[derive(Debug, Clone)]
821pub enum FrontendStatementLoggingEvent {
822 BeganExecution {
825 record: StatementBeganExecutionRecord,
826 mseh_update: Row,
828 prepared_statement: Option<PreparedStatementEvent>,
829 },
830 EndedExecution(StatementEndedExecutionRecord),
832 SetCluster {
834 id: StatementLoggingId,
835 cluster_id: ClusterId,
836 },
837 SetTimestamp {
839 id: StatementLoggingId,
840 timestamp: Timestamp,
841 },
842 SetTransientIndex {
844 id: StatementLoggingId,
845 transient_index_id: GlobalId,
846 },
847 Lifecycle {
849 id: StatementLoggingId,
850 event: StatementLifecycleEvent,
851 when: EpochMillis,
852 },
853}
854
855pub(crate) fn pack_statement_execution_inner(
856 record: &StatementBeganExecutionRecord,
857 packer: &mut RowPacker,
858) {
859 let StatementBeganExecutionRecord {
860 id,
861 prepared_statement_id,
862 sample_rate,
863 params,
864 began_at,
865 cluster_id,
866 cluster_name,
867 database_name,
868 search_path,
869 application_name,
870 transaction_isolation,
871 execution_timestamp,
872 transaction_id,
873 transient_index_id,
874 mz_version,
875 } = record;
876
877 let cluster = cluster_id.map(|id| id.to_string());
878 let transient_index_id = transient_index_id.map(|id| id.to_string());
879 packer.extend([
880 Datum::Uuid(*id),
881 Datum::Uuid(*prepared_statement_id),
882 Datum::Float64((*sample_rate).into()),
883 match &cluster {
884 None => Datum::Null,
885 Some(cluster_id) => Datum::String(cluster_id),
886 },
887 Datum::String(&*application_name),
888 cluster_name.as_ref().map(String::as_str).into(),
889 Datum::String(database_name),
890 ]);
891 packer.push_list(search_path.iter().map(|s| Datum::String(s)));
892 packer.extend([
893 Datum::String(&*transaction_isolation),
894 (*execution_timestamp).into(),
895 Datum::UInt64(*transaction_id),
896 match &transient_index_id {
897 None => Datum::Null,
898 Some(transient_index_id) => Datum::String(transient_index_id),
899 },
900 ]);
901 packer
902 .try_push_array(
903 &[ArrayDimension {
904 lower_bound: 1,
905 length: params.len(),
906 }],
907 params
908 .iter()
909 .map(|p| Datum::from(p.as_ref().map(String::as_str))),
910 )
911 .expect("correct array dimensions");
912 packer.push(Datum::from(mz_version.as_str()));
913 packer.push(Datum::TimestampTz(
914 to_datetime(*began_at).try_into().expect("Sane system time"),
915 ));
916}
917
918pub(crate) fn pack_statement_began_execution_update(record: &StatementBeganExecutionRecord) -> Row {
919 let mut row = Row::default();
920 let mut packer = row.packer();
921 pack_statement_execution_inner(record, &mut packer);
922 packer.extend([
923 Datum::Null,
925 Datum::Null,
927 Datum::Null,
929 Datum::Null,
931 Datum::Null,
933 Datum::Null,
935 ]);
936 row
937}
938
939pub(crate) fn pack_statement_prepared_update(
940 record: &StatementPreparedRecord,
941 packer: &mut RowPacker,
942) {
943 let StatementPreparedRecord {
944 id,
945 session_id,
946 name,
947 sql_hash,
948 prepared_at,
949 kind,
950 } = record;
951 packer.extend([
952 Datum::Uuid(*id),
953 Datum::Uuid(*session_id),
954 Datum::String(name.as_str()),
955 Datum::Bytes(sql_hash.as_slice()),
956 Datum::TimestampTz(to_datetime(*prepared_at).try_into().expect("must fit")),
957 kind.map(statement_kind_label_value).into(),
958 ]);
959}
960
961#[derive(Debug)]
964pub struct WatchSetCreation {
965 pub logging_id: StatementLoggingId,
967 pub timestamp: Timestamp,
969 pub storage_ids: BTreeSet<GlobalId>,
971 pub compute_ids: BTreeSet<GlobalId>,
973}
974
975impl WatchSetCreation {
976 pub fn new(
979 logging_id: StatementLoggingId,
980 catalog_state: &CatalogState,
981 input_id_bundle: &CollectionIdBundle,
982 timestamp: Timestamp,
983 ) -> Self {
984 let mut storage_ids = BTreeSet::new();
985 let mut compute_ids = BTreeSet::new();
986
987 for item_id in input_id_bundle
988 .iter()
989 .map(|gid| catalog_state.get_entry_by_global_id(&gid).id())
990 .flat_map(|id| catalog_state.transitive_uses(id))
991 {
992 let entry = catalog_state.get_entry(&item_id);
993 match entry.item() {
994 CatalogItem::Table(_) | CatalogItem::Source(_) => {
999 storage_ids.extend(entry.global_ids());
1000 }
1001 CatalogItem::MaterializedView(_) | CatalogItem::Index(_) => {
1004 compute_ids.insert(entry.latest_global_id());
1005 }
1006 _ => {}
1007 }
1008 }
1009
1010 Self {
1011 logging_id,
1012 timestamp,
1013 storage_ids,
1014 compute_ids,
1015 }
1016 }
1017}