1use std::collections::BTreeMap;
11use std::sync::Arc;
12
13use bytes::BytesMut;
14use mz_controller_types::ClusterId;
15use mz_ore::now::{NowFn, epoch_to_uuid_v7, to_datetime};
16use mz_ore::task::spawn;
17use mz_ore::{cast::CastFrom, cast::CastInto, now::EpochMillis};
18use mz_repr::adt::array::ArrayDimension;
19use mz_repr::adt::timestamp::TimestampLike;
20use mz_repr::{Datum, Diff, GlobalId, Row, RowPacker, Timestamp};
21use mz_sql::ast::display::AstDisplay;
22use mz_sql::ast::{AstInfo, Statement};
23use mz_sql::plan::Params;
24use mz_sql::session::metadata::SessionMetadata;
25use mz_sql_parser::ast::{StatementKind, statement_kind_label_value};
26use mz_storage_client::controller::IntrospectionType;
27use qcell::QCell;
28use rand::SeedableRng;
29use rand::distr::{Bernoulli, Distribution};
30use sha2::{Digest, Sha256};
31use tokio::time::MissedTickBehavior;
32use tracing::debug;
33use uuid::Uuid;
34
35use crate::coord::{ConnMeta, Coordinator};
36use crate::session::{LifecycleTimestamps, Session};
37use crate::statement_logging::{
38 SessionHistoryEvent, StatementBeganExecutionRecord, StatementEndedExecutionReason,
39 StatementEndedExecutionRecord, StatementLifecycleEvent, StatementPreparedRecord,
40};
41
42use super::Message;
43
44#[derive(Debug)]
46pub enum PreparedStatementLoggingInfo {
47 AlreadyLogged { uuid: Uuid },
51 StillToLog {
54 sql: String,
56 redacted_sql: String,
59 prepared_at: EpochMillis,
61 name: String,
63 session_id: Uuid,
65 accounted: bool,
67 kind: Option<StatementKind>,
69
70 _sealed: sealed::Private,
73 },
74}
75
76impl PreparedStatementLoggingInfo {
77 pub fn still_to_log<A: AstInfo>(
80 raw_sql: String,
81 stmt: Option<&Statement<A>>,
82 prepared_at: EpochMillis,
83 name: String,
84 session_id: Uuid,
85 accounted: bool,
86 ) -> Self {
87 let kind = stmt.map(StatementKind::from);
88 let sql = match kind {
89 Some(
94 StatementKind::CreateSecret
95 | StatementKind::AlterSecret
96 | StatementKind::Insert
97 | StatementKind::Update
98 | StatementKind::Execute,
99 ) => stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default(),
100 _ => raw_sql,
101 };
102
103 PreparedStatementLoggingInfo::StillToLog {
104 sql,
105 redacted_sql: stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default(),
106 prepared_at,
107 name,
108 session_id,
109 accounted,
110 kind,
111 _sealed: sealed::Private,
112 }
113 }
114}
115
116#[derive(Copy, Clone, Debug, Ord, Eq, PartialOrd, PartialEq)]
117pub struct StatementLoggingId(Uuid);
118
119#[derive(Debug)]
120pub(crate) struct PreparedStatementEvent {
121 prepared_statement: Row,
122 sql_text: Row,
123}
124
125#[derive(Debug)]
126pub(crate) struct StatementLogging {
127 executions_begun: BTreeMap<Uuid, StatementBeganExecutionRecord>,
134
135 unlogged_sessions: BTreeMap<Uuid, SessionHistoryEvent>,
139
140 reproducible_rng: rand_chacha::ChaCha8Rng,
144
145 pending_statement_execution_events: Vec<(Row, Diff)>,
146 pending_prepared_statement_events: Vec<PreparedStatementEvent>,
147 pending_session_events: Vec<Row>,
148 pending_statement_lifecycle_events: Vec<Row>,
149
150 now: NowFn,
151
152 tokens: u64,
156 last_logged_ts_seconds: u64,
158 throttled_count: usize,
160}
161
162impl StatementLogging {
163 pub(crate) fn new(now: NowFn) -> Self {
164 let last_logged_ts_seconds = (now)() / 1000;
165 Self {
166 executions_begun: BTreeMap::new(),
167 unlogged_sessions: BTreeMap::new(),
168 reproducible_rng: rand_chacha::ChaCha8Rng::seed_from_u64(42),
169 pending_statement_execution_events: Vec::new(),
170 pending_prepared_statement_events: Vec::new(),
171 pending_session_events: Vec::new(),
172 pending_statement_lifecycle_events: Vec::new(),
173 tokens: 0,
174 last_logged_ts_seconds,
175 now: now.clone(),
176 throttled_count: 0,
177 }
178 }
179
180 fn throttling_check(
185 &mut self,
186 cost: u64,
187 target_data_rate: u64,
188 max_data_credit: Option<u64>,
189 ) -> bool {
190 let ts = (self.now)() / 1000;
191 let elapsed = ts.saturating_sub(self.last_logged_ts_seconds);
194 self.last_logged_ts_seconds = ts;
195 self.tokens = self
196 .tokens
197 .saturating_add(target_data_rate.saturating_mul(elapsed));
198 if let Some(max_data_credit) = max_data_credit {
199 self.tokens = self.tokens.min(max_data_credit);
200 }
201 if let Some(remaining) = self.tokens.checked_sub(cost) {
202 debug!("throttling check passed. tokens remaining: {remaining}; cost: {cost}");
203 self.tokens = remaining;
204 true
205 } else {
206 debug!(
207 "throttling check failed. tokens available: {}; cost: {cost}",
208 self.tokens
209 );
210 false
211 }
212 }
213}
214
215impl Coordinator {
216 pub(crate) fn spawn_statement_logging_task(&self) {
217 let internal_cmd_tx = self.internal_cmd_tx.clone();
218 spawn(|| "statement_logging", async move {
219 let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
224 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
225 loop {
226 interval.tick().await;
227 let _ = internal_cmd_tx.send(Message::DrainStatementLog);
228 }
229 });
230 }
231
232 #[mz_ore::instrument(level = "debug")]
233 pub(crate) fn drain_statement_log(&mut self) {
234 let session_updates = std::mem::take(&mut self.statement_logging.pending_session_events)
235 .into_iter()
236 .map(|update| (update, Diff::ONE))
237 .collect();
238 let (prepared_statement_updates, sql_text_updates) =
239 std::mem::take(&mut self.statement_logging.pending_prepared_statement_events)
240 .into_iter()
241 .map(
242 |PreparedStatementEvent {
243 prepared_statement,
244 sql_text,
245 }| {
246 ((prepared_statement, Diff::ONE), (sql_text, Diff::ONE))
247 },
248 )
249 .unzip::<_, _, Vec<_>, Vec<_>>();
250 let statement_execution_updates =
251 std::mem::take(&mut self.statement_logging.pending_statement_execution_events);
252 let statement_lifecycle_updates =
253 std::mem::take(&mut self.statement_logging.pending_statement_lifecycle_events)
254 .into_iter()
255 .map(|update| (update, Diff::ONE))
256 .collect();
257
258 use IntrospectionType::*;
259 for (type_, updates) in [
260 (SessionHistory, session_updates),
261 (PreparedStatementHistory, prepared_statement_updates),
262 (StatementExecutionHistory, statement_execution_updates),
263 (StatementLifecycleHistory, statement_lifecycle_updates),
264 (SqlText, sql_text_updates),
265 ] {
266 if !updates.is_empty() && !self.controller.read_only() {
267 self.controller
268 .storage
269 .append_introspection_updates(type_, updates);
270 }
271 }
272 }
273
274 fn statement_logging_throttling_check<'a, I>(&mut self, rows: I) -> bool
282 where
283 I: IntoIterator<Item = Option<&'a Row>>,
284 {
285 let cost = rows
286 .into_iter()
287 .filter_map(|row_opt| row_opt.map(|row| row.byte_len()))
288 .fold(0_usize, |acc, x| acc.saturating_add(x));
289
290 let Some(target_data_rate) = self
291 .catalog
292 .system_config()
293 .statement_logging_target_data_rate()
294 else {
295 return true;
296 };
297 let max_data_credit = self
298 .catalog
299 .system_config()
300 .statement_logging_max_data_credit();
301 self.statement_logging.throttling_check(
302 cost.cast_into(),
303 target_data_rate.cast_into(),
304 max_data_credit.map(CastInto::cast_into),
305 )
306 }
307
308 fn record_prepared_statement_as_logged(
311 &self,
312 uuid: Uuid,
313 session: &mut Session,
314 logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
315 ) {
316 let logging = session.qcell_rw(&*logging);
317 if let PreparedStatementLoggingInfo::StillToLog { .. } = logging {
318 *logging = PreparedStatementLoggingInfo::AlreadyLogged { uuid };
319 }
320 }
321
322 pub(crate) fn get_prepared_statement_info(
334 &self,
335 session: &Session,
336 logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
337 ) -> (
338 Option<(StatementPreparedRecord, PreparedStatementEvent)>,
339 Uuid,
340 ) {
341 let logging = session.qcell_ro(&*logging);
342
343 match logging {
344 PreparedStatementLoggingInfo::AlreadyLogged { uuid } => (None, *uuid),
345 PreparedStatementLoggingInfo::StillToLog {
346 sql,
347 redacted_sql,
348 prepared_at,
349 name,
350 session_id,
351 accounted,
352 kind,
353 _sealed: _,
354 } => {
355 assert!(
356 *accounted,
357 "accounting for logging should be done in `begin_statement_execution`"
358 );
359 let uuid = epoch_to_uuid_v7(prepared_at);
360 let sql_hash: [u8; 32] = Sha256::digest(sql.as_bytes()).into();
361 let record = StatementPreparedRecord {
362 id: uuid,
363 sql_hash,
364 name: name.to_string(),
365 session_id: *session_id,
366 prepared_at: *prepared_at,
367 kind: *kind,
368 };
369 let mut mpsh_row = Row::default();
370 let mut mpsh_packer = mpsh_row.packer();
371 Self::pack_statement_prepared_update(&record, &mut mpsh_packer);
372 let sql_row = Row::pack([
373 Datum::TimestampTz(
374 to_datetime(*prepared_at)
375 .truncate_day()
376 .try_into()
377 .expect("must fit"),
378 ),
379 Datum::Bytes(sql_hash.as_slice()),
380 Datum::String(sql.as_str()),
381 Datum::String(redacted_sql.as_str()),
382 ]);
383
384 let throttled_count = self.statement_logging.throttled_count;
385 mpsh_packer.push(Datum::UInt64(throttled_count.try_into().expect("must fit")));
386
387 (
388 Some((
389 record,
390 PreparedStatementEvent {
391 prepared_statement: mpsh_row,
392 sql_text: sql_row,
393 },
394 )),
395 uuid,
396 )
397 }
398 }
399 }
400 pub fn statement_execution_sample_rate(&self, session: &Session) -> f64 {
404 let system: f64 = self
405 .catalog()
406 .system_config()
407 .statement_logging_max_sample_rate()
408 .try_into()
409 .expect("value constrained to be convertible to f64");
410 let user: f64 = session
411 .vars()
412 .get_statement_logging_sample_rate()
413 .try_into()
414 .expect("value constrained to be convertible to f64");
415 f64::min(system, user)
416 }
417
418 pub fn end_statement_execution(
424 &mut self,
425 id: StatementLoggingId,
426 reason: StatementEndedExecutionReason,
427 ) {
428 let StatementLoggingId(uuid) = id;
429 let now = self.now();
430 let ended_record = StatementEndedExecutionRecord {
431 id: uuid,
432 reason,
433 ended_at: now,
434 };
435
436 let began_record = self
437 .statement_logging
438 .executions_begun
439 .remove(&uuid)
440 .expect(
441 "matched `begin_statement_execution` and `end_statement_execution` invocations",
442 );
443 for (row, diff) in
444 Self::pack_statement_ended_execution_updates(&began_record, &ended_record)
445 {
446 self.statement_logging
447 .pending_statement_execution_events
448 .push((row, diff));
449 }
450 self.record_statement_lifecycle_event(
451 &id,
452 &StatementLifecycleEvent::ExecutionFinished,
453 now,
454 );
455 }
456
457 fn pack_statement_execution_inner(
458 record: &StatementBeganExecutionRecord,
459 packer: &mut RowPacker,
460 ) {
461 let StatementBeganExecutionRecord {
462 id,
463 prepared_statement_id,
464 sample_rate,
465 params,
466 began_at,
467 cluster_id,
468 cluster_name,
469 database_name,
470 search_path,
471 application_name,
472 transaction_isolation,
473 execution_timestamp,
474 transaction_id,
475 transient_index_id,
476 mz_version,
477 } = record;
478
479 let cluster = cluster_id.map(|id| id.to_string());
480 let transient_index_id = transient_index_id.map(|id| id.to_string());
481 packer.extend([
482 Datum::Uuid(*id),
483 Datum::Uuid(*prepared_statement_id),
484 Datum::Float64((*sample_rate).into()),
485 match &cluster {
486 None => Datum::Null,
487 Some(cluster_id) => Datum::String(cluster_id),
488 },
489 Datum::String(&*application_name),
490 cluster_name.as_ref().map(String::as_str).into(),
491 Datum::String(database_name),
492 ]);
493 packer.push_list(search_path.iter().map(|s| Datum::String(s)));
494 packer.extend([
495 Datum::String(&*transaction_isolation),
496 (*execution_timestamp).into(),
497 Datum::UInt64(*transaction_id),
498 match &transient_index_id {
499 None => Datum::Null,
500 Some(transient_index_id) => Datum::String(transient_index_id),
501 },
502 ]);
503 packer
504 .try_push_array(
505 &[ArrayDimension {
506 lower_bound: 1,
507 length: params.len(),
508 }],
509 params
510 .iter()
511 .map(|p| Datum::from(p.as_ref().map(String::as_str))),
512 )
513 .expect("correct array dimensions");
514 packer.push(Datum::from(mz_version.as_str()));
515 packer.push(Datum::TimestampTz(
516 to_datetime(*began_at).try_into().expect("Sane system time"),
517 ));
518 }
519
520 fn pack_statement_began_execution_update(record: &StatementBeganExecutionRecord) -> Row {
521 let mut row = Row::default();
522 let mut packer = row.packer();
523 Self::pack_statement_execution_inner(record, &mut packer);
524 packer.extend([
525 Datum::Null,
527 Datum::Null,
529 Datum::Null,
531 Datum::Null,
533 Datum::Null,
535 Datum::Null,
537 ]);
538 row
539 }
540
541 fn pack_statement_prepared_update(record: &StatementPreparedRecord, packer: &mut RowPacker) {
542 let StatementPreparedRecord {
543 id,
544 session_id,
545 name,
546 sql_hash,
547 prepared_at,
548 kind,
549 } = record;
550 packer.extend([
551 Datum::Uuid(*id),
552 Datum::Uuid(*session_id),
553 Datum::String(name.as_str()),
554 Datum::Bytes(sql_hash.as_slice()),
555 Datum::TimestampTz(to_datetime(*prepared_at).try_into().expect("must fit")),
556 kind.map(statement_kind_label_value).into(),
557 ]);
558 }
559
560 fn pack_session_history_update(event: &SessionHistoryEvent) -> Row {
561 let SessionHistoryEvent {
562 id,
563 connected_at,
564 application_name,
565 authenticated_user,
566 } = event;
567 Row::pack_slice(&[
568 Datum::Uuid(*id),
569 Datum::TimestampTz(
570 mz_ore::now::to_datetime(*connected_at)
571 .try_into()
572 .expect("must fit"),
573 ),
574 Datum::String(&*application_name),
575 Datum::String(&*authenticated_user),
576 ])
577 }
578
579 fn pack_statement_lifecycle_event(
580 StatementLoggingId(uuid): &StatementLoggingId,
581 event: &StatementLifecycleEvent,
582 when: EpochMillis,
583 ) -> Row {
584 Row::pack_slice(&[
585 Datum::Uuid(*uuid),
586 Datum::String(event.as_str()),
587 Datum::TimestampTz(mz_ore::now::to_datetime(when).try_into().expect("must fit")),
588 ])
589 }
590
591 pub fn pack_full_statement_execution_update(
592 began_record: &StatementBeganExecutionRecord,
593 ended_record: &StatementEndedExecutionRecord,
594 ) -> Row {
595 let mut row = Row::default();
596 let mut packer = row.packer();
597 Self::pack_statement_execution_inner(began_record, &mut packer);
598 let (status, error_message, result_size, rows_returned, execution_strategy) =
599 match &ended_record.reason {
600 StatementEndedExecutionReason::Success {
601 result_size,
602 rows_returned,
603 execution_strategy,
604 } => (
605 "success",
606 None,
607 result_size.map(|rs| i64::try_from(rs).expect("must fit")),
608 rows_returned.map(|rr| i64::try_from(rr).expect("must fit")),
609 execution_strategy.map(|es| es.name()),
610 ),
611 StatementEndedExecutionReason::Canceled => ("canceled", None, None, None, None),
612 StatementEndedExecutionReason::Errored { error } => {
613 ("error", Some(error.as_str()), None, None, None)
614 }
615 StatementEndedExecutionReason::Aborted => ("aborted", None, None, None, None),
616 };
617 packer.extend([
618 Datum::TimestampTz(
619 to_datetime(ended_record.ended_at)
620 .try_into()
621 .expect("Sane system time"),
622 ),
623 status.into(),
624 error_message.into(),
625 result_size.into(),
626 rows_returned.into(),
627 execution_strategy.into(),
628 ]);
629 row
630 }
631
632 pub fn pack_statement_ended_execution_updates(
633 began_record: &StatementBeganExecutionRecord,
634 ended_record: &StatementEndedExecutionRecord,
635 ) -> [(Row, Diff); 2] {
636 let retraction = Self::pack_statement_began_execution_update(began_record);
637 let new = Self::pack_full_statement_execution_update(began_record, ended_record);
638 [(retraction, Diff::MINUS_ONE), (new, Diff::ONE)]
639 }
640
641 fn mutate_record<F: FnOnce(&mut StatementBeganExecutionRecord)>(
643 &mut self,
644 StatementLoggingId(id): StatementLoggingId,
645 f: F,
646 ) {
647 let record = self
648 .statement_logging
649 .executions_begun
650 .get_mut(&id)
651 .expect("mutate_record must not be called after execution ends");
652 let retraction = Self::pack_statement_began_execution_update(record);
653 self.statement_logging
654 .pending_statement_execution_events
655 .push((retraction, Diff::MINUS_ONE));
656 f(record);
657 let update = Self::pack_statement_began_execution_update(record);
658 self.statement_logging
659 .pending_statement_execution_events
660 .push((update, Diff::ONE));
661 }
662
663 pub fn set_statement_execution_cluster(
665 &mut self,
666 id: StatementLoggingId,
667 cluster_id: ClusterId,
668 ) {
669 let cluster_name = self.catalog().get_cluster(cluster_id).name.clone();
670 self.mutate_record(id, |record| {
671 record.cluster_name = Some(cluster_name);
672 record.cluster_id = Some(cluster_id);
673 });
674 }
675
676 pub fn set_statement_execution_timestamp(
678 &mut self,
679 id: StatementLoggingId,
680 timestamp: Timestamp,
681 ) {
682 self.mutate_record(id, |record| {
683 record.execution_timestamp = Some(u64::from(timestamp));
684 });
685 }
686
687 pub fn set_transient_index_id(&mut self, id: StatementLoggingId, transient_index_id: GlobalId) {
688 self.mutate_record(id, |record| {
689 record.transient_index_id = Some(transient_index_id)
690 });
691 }
692
693 pub fn begin_statement_execution(
700 &mut self,
701 session: &mut Session,
702 params: &Params,
703 logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
704 lifecycle_timestamps: Option<LifecycleTimestamps>,
705 ) -> Option<StatementLoggingId> {
706 let enable_internal_statement_logging = self
707 .catalog()
708 .system_config()
709 .enable_internal_statement_logging();
710 if session.user().is_internal() && !enable_internal_statement_logging {
711 return None;
712 }
713 let sample_rate = self.statement_execution_sample_rate(session);
714
715 let distribution = Bernoulli::new(sample_rate).expect("rate must be in range [0, 1]");
716 let sample = if self
717 .catalog()
718 .system_config()
719 .statement_logging_use_reproducible_rng()
720 {
721 distribution.sample(&mut self.statement_logging.reproducible_rng)
722 } else {
723 distribution.sample(&mut rand::rng())
724 };
725
726 let sampled_label = sample.then_some("true").unwrap_or("false");
730 self.metrics
731 .statement_logging_records
732 .with_label_values(&[sampled_label])
733 .inc_by(1);
734
735 if let Some((sql, accounted)) = match session.qcell_rw(logging) {
736 PreparedStatementLoggingInfo::AlreadyLogged { .. } => None,
737 PreparedStatementLoggingInfo::StillToLog { sql, accounted, .. } => {
738 Some((sql, accounted))
739 }
740 } {
741 if !*accounted {
742 self.metrics
743 .statement_logging_unsampled_bytes
744 .inc_by(u64::cast_from(sql.len()));
745 if sample {
746 self.metrics
747 .statement_logging_actual_bytes
748 .inc_by(u64::cast_from(sql.len()));
749 }
750 *accounted = true;
751 }
752 }
753 if !sample {
754 return None;
755 }
756
757 let (maybe_ps, ps_uuid) = self.get_prepared_statement_info(session, logging);
758
759 let began_at = if let Some(lifecycle_timestamps) = lifecycle_timestamps {
760 lifecycle_timestamps.received
761 } else {
762 self.now()
763 };
764 let now = self.now();
765 let execution_uuid = epoch_to_uuid_v7(&now);
766
767 let params = std::iter::zip(params.execute_types.iter(), params.datums.iter())
768 .map(|(r#type, datum)| {
769 mz_pgrepr::Value::from_datum(datum, r#type).map(|val| {
770 let mut buf = BytesMut::new();
771 val.encode_text(&mut buf);
772 String::from_utf8(Into::<Vec<u8>>::into(buf))
773 .expect("Serialization shouldn't produce non-UTF-8 strings.")
774 })
775 })
776 .collect();
777 let record = StatementBeganExecutionRecord {
778 id: execution_uuid,
779 prepared_statement_id: ps_uuid,
780 sample_rate,
781 params,
782 began_at,
783 application_name: session.application_name().to_string(),
784 transaction_isolation: session.vars().transaction_isolation().to_string(),
785 transaction_id: session
786 .transaction()
787 .inner()
788 .expect("Every statement runs in an explicit or implicit transaction")
789 .id,
790 mz_version: self
791 .catalog()
792 .state()
793 .config()
794 .build_info
795 .human_version(None),
796 cluster_id: None,
798 cluster_name: None,
799 execution_timestamp: None,
800 transient_index_id: None,
801 database_name: session.vars().database().into(),
802 search_path: session
803 .vars()
804 .search_path()
805 .iter()
806 .map(|s| s.as_str().to_string())
807 .collect(),
808 };
809 let mseh_update = Self::pack_statement_began_execution_update(&record);
810
811 let (maybe_ps_event, maybe_sh_event) = if let Some((ps_record, ps_event)) = maybe_ps {
812 if let Some(sh) = self
813 .statement_logging
814 .unlogged_sessions
815 .get(&ps_record.session_id)
816 {
817 (
818 Some(ps_event),
819 Some((Self::pack_session_history_update(sh), ps_record.session_id)),
820 )
821 } else {
822 (Some(ps_event), None)
823 }
824 } else {
825 (None, None)
826 };
827
828 let maybe_ps_prepared_statement = maybe_ps_event.as_ref().map(|e| &e.prepared_statement);
829 let maybe_ps_sql_text = maybe_ps_event.as_ref().map(|e| &e.sql_text);
830
831 if !self.statement_logging_throttling_check([
832 Some(&mseh_update),
833 maybe_ps_prepared_statement,
834 maybe_ps_sql_text,
835 maybe_sh_event.as_ref().map(|(row, _)| row),
836 ]) {
837 self.statement_logging.throttled_count += 1;
838 return None;
839 }
840 else if let PreparedStatementLoggingInfo::StillToLog { .. } = session.qcell_ro(logging) {
845 self.statement_logging.throttled_count = 0;
846 }
847
848 self.record_prepared_statement_as_logged(ps_uuid, session, logging);
849
850 self.record_statement_lifecycle_event(
851 &StatementLoggingId(execution_uuid),
852 &StatementLifecycleEvent::ExecutionBegan,
853 began_at,
854 );
855
856 self.statement_logging
857 .pending_statement_execution_events
858 .push((mseh_update, Diff::ONE));
859 self.statement_logging
860 .executions_begun
861 .insert(execution_uuid, record);
862
863 if let Some((sh_update, session_id)) = maybe_sh_event {
864 self.statement_logging
865 .pending_session_events
866 .push(sh_update);
867 self.statement_logging.unlogged_sessions.remove(&session_id);
869 }
870 if let Some(ps_event) = maybe_ps_event {
871 self.statement_logging
872 .pending_prepared_statement_events
873 .push(ps_event);
874 }
875 Some(StatementLoggingId(execution_uuid))
876 }
877
878 pub fn begin_session_for_statement_logging(&mut self, session: &ConnMeta) {
880 let id = session.uuid();
881 let session_role = session.authenticated_role_id();
882 let event = SessionHistoryEvent {
883 id,
884 connected_at: session.connected_at(),
885 application_name: session.application_name().to_owned(),
886 authenticated_user: self.catalog.get_role(session_role).name.clone(),
887 };
888 self.statement_logging.unlogged_sessions.insert(id, event);
889 }
890
891 pub fn end_session_for_statement_logging(&mut self, uuid: Uuid) {
892 self.statement_logging.unlogged_sessions.remove(&uuid);
893 }
894
895 pub fn record_statement_lifecycle_event(
896 &mut self,
897 id: &StatementLoggingId,
898 event: &StatementLifecycleEvent,
899 when: EpochMillis,
900 ) {
901 if mz_adapter_types::dyncfgs::ENABLE_STATEMENT_LIFECYCLE_LOGGING
902 .get(self.catalog().system_config().dyncfgs())
903 {
904 let row = Self::pack_statement_lifecycle_event(id, event, when);
905 self.statement_logging
906 .pending_statement_lifecycle_events
907 .push(row);
908 }
909 }
910}
911
912mod sealed {
913 #[derive(Debug, Copy, Clone)]
916 pub struct Private;
917}