1use std::collections::BTreeMap;
11use std::sync::Arc;
12
13use bytes::BytesMut;
14use mz_controller_types::ClusterId;
15use mz_ore::now::{NowFn, epoch_to_uuid_v7, to_datetime};
16use mz_ore::task::spawn;
17use mz_ore::{cast::CastFrom, cast::CastInto, now::EpochMillis};
18use mz_repr::adt::array::ArrayDimension;
19use mz_repr::adt::timestamp::TimestampLike;
20use mz_repr::{Datum, Diff, GlobalId, Row, RowPacker, Timestamp};
21use mz_sql::ast::display::AstDisplay;
22use mz_sql::ast::{AstInfo, Statement};
23use mz_sql::plan::Params;
24use mz_sql::session::metadata::SessionMetadata;
25use mz_sql_parser::ast::{StatementKind, statement_kind_label_value};
26use mz_storage_client::controller::IntrospectionType;
27use qcell::QCell;
28use rand::SeedableRng;
29use rand::{distributions::Bernoulli, prelude::Distribution, thread_rng};
30use sha2::{Digest, Sha256};
31use tokio::time::MissedTickBehavior;
32use tracing::debug;
33use uuid::Uuid;
34
35use crate::coord::{ConnMeta, Coordinator};
36use crate::session::{LifecycleTimestamps, Session};
37use crate::statement_logging::{
38 SessionHistoryEvent, StatementBeganExecutionRecord, StatementEndedExecutionReason,
39 StatementEndedExecutionRecord, StatementLifecycleEvent, StatementPreparedRecord,
40};
41
42use super::Message;
43
44#[derive(Debug)]
46pub enum PreparedStatementLoggingInfo {
47 AlreadyLogged { uuid: Uuid },
51 StillToLog {
54 sql: String,
56 redacted_sql: String,
59 prepared_at: EpochMillis,
61 name: String,
63 session_id: Uuid,
65 accounted: bool,
67 kind: Option<StatementKind>,
69
70 _sealed: sealed::Private,
73 },
74}
75
76impl PreparedStatementLoggingInfo {
77 pub fn still_to_log<A: AstInfo>(
80 raw_sql: String,
81 stmt: Option<&Statement<A>>,
82 prepared_at: EpochMillis,
83 name: String,
84 session_id: Uuid,
85 accounted: bool,
86 ) -> Self {
87 let kind = stmt.map(StatementKind::from);
88 let sql = match kind {
89 Some(
94 StatementKind::CreateSecret
95 | StatementKind::AlterSecret
96 | StatementKind::Insert
97 | StatementKind::Update
98 | StatementKind::Execute,
99 ) => stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default(),
100 _ => raw_sql,
101 };
102
103 PreparedStatementLoggingInfo::StillToLog {
104 sql,
105 redacted_sql: stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default(),
106 prepared_at,
107 name,
108 session_id,
109 accounted,
110 kind,
111 _sealed: sealed::Private,
112 }
113 }
114}
115
116#[derive(Copy, Clone, Debug, Ord, Eq, PartialOrd, PartialEq)]
117pub struct StatementLoggingId(Uuid);
118
119#[derive(Debug)]
120pub(crate) struct PreparedStatementEvent {
121 prepared_statement: Row,
122 sql_text: Row,
123}
124
125#[derive(Debug)]
126pub(crate) struct StatementLogging {
127 executions_begun: BTreeMap<Uuid, StatementBeganExecutionRecord>,
134
135 unlogged_sessions: BTreeMap<Uuid, SessionHistoryEvent>,
139
140 reproducible_rng: rand_chacha::ChaCha8Rng,
144
145 pending_statement_execution_events: Vec<(Row, Diff)>,
146 pending_prepared_statement_events: Vec<PreparedStatementEvent>,
147 pending_session_events: Vec<Row>,
148 pending_statement_lifecycle_events: Vec<Row>,
149
150 now: NowFn,
151
152 tokens: u64,
156 last_logged_ts_seconds: u64,
158 throttled_count: usize,
160}
161
162impl StatementLogging {
163 pub(crate) fn new(now: NowFn) -> Self {
164 let last_logged_ts_seconds = (now)() / 1000;
165 Self {
166 executions_begun: BTreeMap::new(),
167 unlogged_sessions: BTreeMap::new(),
168 reproducible_rng: rand_chacha::ChaCha8Rng::seed_from_u64(42),
169 pending_statement_execution_events: Vec::new(),
170 pending_prepared_statement_events: Vec::new(),
171 pending_session_events: Vec::new(),
172 pending_statement_lifecycle_events: Vec::new(),
173 tokens: 0,
174 last_logged_ts_seconds,
175 now: now.clone(),
176 throttled_count: 0,
177 }
178 }
179
180 fn throttling_check(
186 &mut self,
187 cost: u64,
188 target_data_rate: u64,
189 max_data_credit: Option<u64>,
190 ) -> Option<usize> {
191 let ts = (self.now)() / 1000;
192 let elapsed = ts.saturating_sub(self.last_logged_ts_seconds);
195 self.last_logged_ts_seconds = ts;
196 self.tokens = self
197 .tokens
198 .saturating_add(target_data_rate.saturating_mul(elapsed));
199 if let Some(max_data_credit) = max_data_credit {
200 self.tokens = self.tokens.min(max_data_credit);
201 }
202 if let Some(remaining) = self.tokens.checked_sub(cost) {
203 debug!("throttling check passed. tokens remaining: {remaining}; cost: {cost}");
204 self.tokens = remaining;
205 Some(std::mem::take(&mut self.throttled_count))
206 } else {
207 debug!(
208 "throttling check failed. tokens available: {}; cost: {cost}",
209 self.tokens
210 );
211 self.throttled_count += 1;
212 None
213 }
214 }
215}
216
217impl Coordinator {
218 pub(crate) fn spawn_statement_logging_task(&self) {
219 let internal_cmd_tx = self.internal_cmd_tx.clone();
220 spawn(|| "statement_logging", async move {
221 let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
226 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
227 loop {
228 interval.tick().await;
229 let _ = internal_cmd_tx.send(Message::DrainStatementLog);
230 }
231 });
232 }
233
234 #[mz_ore::instrument(level = "debug")]
235 pub(crate) fn drain_statement_log(&mut self) {
236 let session_updates = std::mem::take(&mut self.statement_logging.pending_session_events)
237 .into_iter()
238 .map(|update| (update, Diff::ONE))
239 .collect();
240 let (prepared_statement_updates, sql_text_updates) =
241 std::mem::take(&mut self.statement_logging.pending_prepared_statement_events)
242 .into_iter()
243 .map(
244 |PreparedStatementEvent {
245 prepared_statement,
246 sql_text,
247 }| {
248 ((prepared_statement, Diff::ONE), (sql_text, Diff::ONE))
249 },
250 )
251 .unzip::<_, _, Vec<_>, Vec<_>>();
252 let statement_execution_updates =
253 std::mem::take(&mut self.statement_logging.pending_statement_execution_events);
254 let statement_lifecycle_updates =
255 std::mem::take(&mut self.statement_logging.pending_statement_lifecycle_events)
256 .into_iter()
257 .map(|update| (update, Diff::ONE))
258 .collect();
259
260 use IntrospectionType::*;
261 for (type_, updates) in [
262 (SessionHistory, session_updates),
263 (PreparedStatementHistory, prepared_statement_updates),
264 (StatementExecutionHistory, statement_execution_updates),
265 (StatementLifecycleHistory, statement_lifecycle_updates),
266 (SqlText, sql_text_updates),
267 ] {
268 if !updates.is_empty() && !self.controller.read_only() {
269 self.controller
270 .storage
271 .append_introspection_updates(type_, updates);
272 }
273 }
274 }
275
276 fn statement_logging_throttling_check(&mut self, cost: usize) -> Option<usize> {
282 let Some(target_data_rate) = self
283 .catalog
284 .system_config()
285 .statement_logging_target_data_rate()
286 else {
287 return Some(std::mem::take(&mut self.statement_logging.throttled_count));
288 };
289 let max_data_credit = self
290 .catalog
291 .system_config()
292 .statement_logging_max_data_credit();
293 self.statement_logging.throttling_check(
294 cost.cast_into(),
295 target_data_rate.cast_into(),
296 max_data_credit.map(CastInto::cast_into),
297 )
298 }
299
300 pub(crate) fn log_prepared_statement(
307 &mut self,
308 session: &mut Session,
309 logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
310 ) -> Option<(
311 Option<(StatementPreparedRecord, PreparedStatementEvent)>,
312 Uuid,
313 )> {
314 let logging = session.qcell_rw(&*logging);
315 let mut out = None;
316
317 let uuid = match logging {
318 PreparedStatementLoggingInfo::AlreadyLogged { uuid } => *uuid,
319 PreparedStatementLoggingInfo::StillToLog {
320 sql,
321 redacted_sql,
322 prepared_at,
323 name,
324 session_id,
325 accounted,
326 kind,
327 _sealed: _,
328 } => {
329 assert!(
330 *accounted,
331 "accounting for logging should be done in `begin_statement_execution`"
332 );
333 let uuid = epoch_to_uuid_v7(prepared_at);
334 let sql = std::mem::take(sql);
335 let redacted_sql = std::mem::take(redacted_sql);
336 let sql_hash: [u8; 32] = Sha256::digest(sql.as_bytes()).into();
337 let record = StatementPreparedRecord {
338 id: uuid,
339 sql_hash,
340 name: std::mem::take(name),
341 session_id: *session_id,
342 prepared_at: *prepared_at,
343 kind: *kind,
344 };
345 let mut mpsh_row = Row::default();
346 let mut mpsh_packer = mpsh_row.packer();
347 Self::pack_statement_prepared_update(&record, &mut mpsh_packer);
348 let sql_row = Row::pack([
349 Datum::TimestampTz(
350 to_datetime(*prepared_at)
351 .truncate_day()
352 .try_into()
353 .expect("must fit"),
354 ),
355 Datum::Bytes(sql_hash.as_slice()),
356 Datum::String(sql.as_str()),
357 Datum::String(redacted_sql.as_str()),
358 ]);
359
360 let cost = mpsh_packer.byte_len() + sql_row.byte_len();
361 let throttled_count = self.statement_logging_throttling_check(cost)?;
362 mpsh_packer.push(Datum::UInt64(throttled_count.try_into().expect("must fit")));
363 out = Some((
364 record,
365 PreparedStatementEvent {
366 prepared_statement: mpsh_row,
367 sql_text: sql_row,
368 },
369 ));
370
371 *logging = PreparedStatementLoggingInfo::AlreadyLogged { uuid };
372 uuid
373 }
374 };
375 Some((out, uuid))
376 }
377 pub fn statement_execution_sample_rate(&self, session: &Session) -> f64 {
381 let system: f64 = self
382 .catalog()
383 .system_config()
384 .statement_logging_max_sample_rate()
385 .try_into()
386 .expect("value constrained to be convertible to f64");
387 let user: f64 = session
388 .vars()
389 .get_statement_logging_sample_rate()
390 .try_into()
391 .expect("value constrained to be convertible to f64");
392 f64::min(system, user)
393 }
394
395 pub fn end_statement_execution(
401 &mut self,
402 id: StatementLoggingId,
403 reason: StatementEndedExecutionReason,
404 ) {
405 let StatementLoggingId(uuid) = id;
406 let now = self.now();
407 let ended_record = StatementEndedExecutionRecord {
408 id: uuid,
409 reason,
410 ended_at: now,
411 };
412
413 let began_record = self
414 .statement_logging
415 .executions_begun
416 .remove(&uuid)
417 .expect(
418 "matched `begin_statement_execution` and `end_statement_execution` invocations",
419 );
420 for (row, diff) in
421 Self::pack_statement_ended_execution_updates(&began_record, &ended_record)
422 {
423 self.statement_logging
424 .pending_statement_execution_events
425 .push((row, diff));
426 }
427 self.record_statement_lifecycle_event(
428 &id,
429 &StatementLifecycleEvent::ExecutionFinished,
430 now,
431 );
432 }
433
434 fn pack_statement_execution_inner(
435 record: &StatementBeganExecutionRecord,
436 packer: &mut RowPacker,
437 ) {
438 let StatementBeganExecutionRecord {
439 id,
440 prepared_statement_id,
441 sample_rate,
442 params,
443 began_at,
444 cluster_id,
445 cluster_name,
446 database_name,
447 search_path,
448 application_name,
449 transaction_isolation,
450 execution_timestamp,
451 transaction_id,
452 transient_index_id,
453 mz_version,
454 } = record;
455
456 let cluster = cluster_id.map(|id| id.to_string());
457 let transient_index_id = transient_index_id.map(|id| id.to_string());
458 packer.extend([
459 Datum::Uuid(*id),
460 Datum::Uuid(*prepared_statement_id),
461 Datum::Float64((*sample_rate).into()),
462 match &cluster {
463 None => Datum::Null,
464 Some(cluster_id) => Datum::String(cluster_id),
465 },
466 Datum::String(&*application_name),
467 cluster_name.as_ref().map(String::as_str).into(),
468 Datum::String(database_name),
469 ]);
470 packer.push_list(search_path.iter().map(|s| Datum::String(s)));
471 packer.extend([
472 Datum::String(&*transaction_isolation),
473 (*execution_timestamp).into(),
474 Datum::UInt64(*transaction_id),
475 match &transient_index_id {
476 None => Datum::Null,
477 Some(transient_index_id) => Datum::String(transient_index_id),
478 },
479 ]);
480 packer
481 .try_push_array(
482 &[ArrayDimension {
483 lower_bound: 1,
484 length: params.len(),
485 }],
486 params
487 .iter()
488 .map(|p| Datum::from(p.as_ref().map(String::as_str))),
489 )
490 .expect("correct array dimensions");
491 packer.push(Datum::from(mz_version.as_str()));
492 packer.push(Datum::TimestampTz(
493 to_datetime(*began_at).try_into().expect("Sane system time"),
494 ));
495 }
496
497 fn pack_statement_began_execution_update(record: &StatementBeganExecutionRecord) -> Row {
498 let mut row = Row::default();
499 let mut packer = row.packer();
500 Self::pack_statement_execution_inner(record, &mut packer);
501 packer.extend([
502 Datum::Null,
504 Datum::Null,
506 Datum::Null,
508 Datum::Null,
510 Datum::Null,
512 Datum::Null,
514 ]);
515 row
516 }
517
518 fn pack_statement_prepared_update(record: &StatementPreparedRecord, packer: &mut RowPacker) {
519 let StatementPreparedRecord {
520 id,
521 session_id,
522 name,
523 sql_hash,
524 prepared_at,
525 kind,
526 } = record;
527 packer.extend([
528 Datum::Uuid(*id),
529 Datum::Uuid(*session_id),
530 Datum::String(name.as_str()),
531 Datum::Bytes(sql_hash.as_slice()),
532 Datum::TimestampTz(to_datetime(*prepared_at).try_into().expect("must fit")),
533 kind.map(statement_kind_label_value).into(),
534 ]);
535 }
536
537 fn pack_session_history_update(event: &SessionHistoryEvent) -> Row {
538 let SessionHistoryEvent {
539 id,
540 connected_at,
541 application_name,
542 authenticated_user,
543 } = event;
544 Row::pack_slice(&[
545 Datum::Uuid(*id),
546 Datum::TimestampTz(
547 mz_ore::now::to_datetime(*connected_at)
548 .try_into()
549 .expect("must fit"),
550 ),
551 Datum::String(&*application_name),
552 Datum::String(&*authenticated_user),
553 ])
554 }
555
556 fn pack_statement_lifecycle_event(
557 StatementLoggingId(uuid): &StatementLoggingId,
558 event: &StatementLifecycleEvent,
559 when: EpochMillis,
560 ) -> Row {
561 Row::pack_slice(&[
562 Datum::Uuid(*uuid),
563 Datum::String(event.as_str()),
564 Datum::TimestampTz(mz_ore::now::to_datetime(when).try_into().expect("must fit")),
565 ])
566 }
567
568 pub fn pack_full_statement_execution_update(
569 began_record: &StatementBeganExecutionRecord,
570 ended_record: &StatementEndedExecutionRecord,
571 ) -> Row {
572 let mut row = Row::default();
573 let mut packer = row.packer();
574 Self::pack_statement_execution_inner(began_record, &mut packer);
575 let (status, error_message, result_size, rows_returned, execution_strategy) =
576 match &ended_record.reason {
577 StatementEndedExecutionReason::Success {
578 result_size,
579 rows_returned,
580 execution_strategy,
581 } => (
582 "success",
583 None,
584 result_size.map(|rs| i64::try_from(rs).expect("must fit")),
585 rows_returned.map(|rr| i64::try_from(rr).expect("must fit")),
586 execution_strategy.map(|es| es.name()),
587 ),
588 StatementEndedExecutionReason::Canceled => ("canceled", None, None, None, None),
589 StatementEndedExecutionReason::Errored { error } => {
590 ("error", Some(error.as_str()), None, None, None)
591 }
592 StatementEndedExecutionReason::Aborted => ("aborted", None, None, None, None),
593 };
594 packer.extend([
595 Datum::TimestampTz(
596 to_datetime(ended_record.ended_at)
597 .try_into()
598 .expect("Sane system time"),
599 ),
600 status.into(),
601 error_message.into(),
602 result_size.into(),
603 rows_returned.into(),
604 execution_strategy.into(),
605 ]);
606 row
607 }
608
609 pub fn pack_statement_ended_execution_updates(
610 began_record: &StatementBeganExecutionRecord,
611 ended_record: &StatementEndedExecutionRecord,
612 ) -> [(Row, Diff); 2] {
613 let retraction = Self::pack_statement_began_execution_update(began_record);
614 let new = Self::pack_full_statement_execution_update(began_record, ended_record);
615 [(retraction, Diff::MINUS_ONE), (new, Diff::ONE)]
616 }
617
618 fn mutate_record<F: FnOnce(&mut StatementBeganExecutionRecord)>(
620 &mut self,
621 StatementLoggingId(id): StatementLoggingId,
622 f: F,
623 ) {
624 let record = self
625 .statement_logging
626 .executions_begun
627 .get_mut(&id)
628 .expect("mutate_record must not be called after execution ends");
629 let retraction = Self::pack_statement_began_execution_update(record);
630 self.statement_logging
631 .pending_statement_execution_events
632 .push((retraction, Diff::MINUS_ONE));
633 f(record);
634 let update = Self::pack_statement_began_execution_update(record);
635 self.statement_logging
636 .pending_statement_execution_events
637 .push((update, Diff::ONE));
638 }
639
640 pub fn set_statement_execution_cluster(
642 &mut self,
643 id: StatementLoggingId,
644 cluster_id: ClusterId,
645 ) {
646 let cluster_name = self.catalog().get_cluster(cluster_id).name.clone();
647 self.mutate_record(id, |record| {
648 record.cluster_name = Some(cluster_name);
649 record.cluster_id = Some(cluster_id);
650 });
651 }
652
653 pub fn set_statement_execution_timestamp(
655 &mut self,
656 id: StatementLoggingId,
657 timestamp: Timestamp,
658 ) {
659 self.mutate_record(id, |record| {
660 record.execution_timestamp = Some(u64::from(timestamp));
661 });
662 }
663
664 pub fn set_transient_index_id(&mut self, id: StatementLoggingId, transient_index_id: GlobalId) {
665 self.mutate_record(id, |record| {
666 record.transient_index_id = Some(transient_index_id)
667 });
668 }
669
670 pub fn begin_statement_execution(
677 &mut self,
678 session: &mut Session,
679 params: &Params,
680 logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
681 lifecycle_timestamps: Option<LifecycleTimestamps>,
682 ) -> Option<StatementLoggingId> {
683 let enable_internal_statement_logging = self
684 .catalog()
685 .system_config()
686 .enable_internal_statement_logging();
687 if session.user().is_internal() && !enable_internal_statement_logging {
688 return None;
689 }
690 let sample_rate = self.statement_execution_sample_rate(session);
691
692 let distribution = Bernoulli::new(sample_rate).expect("rate must be in range [0, 1]");
693 let sample = if self
694 .catalog()
695 .system_config()
696 .statement_logging_use_reproducible_rng()
697 {
698 distribution.sample(&mut self.statement_logging.reproducible_rng)
699 } else {
700 distribution.sample(&mut thread_rng())
701 };
702
703 let sampled_label = sample.then_some("true").unwrap_or("false");
705 self.metrics
706 .statement_logging_records
707 .with_label_values(&[sampled_label])
708 .inc_by(1);
709
710 if let Some((sql, accounted)) = match session.qcell_rw(logging) {
711 PreparedStatementLoggingInfo::AlreadyLogged { .. } => None,
712 PreparedStatementLoggingInfo::StillToLog { sql, accounted, .. } => {
713 Some((sql, accounted))
714 }
715 } {
716 if !*accounted {
717 self.metrics
718 .statement_logging_unsampled_bytes
719 .with_label_values(&[])
720 .inc_by(u64::cast_from(sql.len()));
721 if sample {
722 self.metrics
723 .statement_logging_actual_bytes
724 .with_label_values(&[])
725 .inc_by(u64::cast_from(sql.len()));
726 }
727 *accounted = true;
728 }
729 }
730 if !sample {
731 return None;
732 }
733 let (ps_record, ps_uuid) = self.log_prepared_statement(session, logging)?;
734
735 let began_at = if let Some(lifecycle_timestamps) = lifecycle_timestamps {
736 lifecycle_timestamps.received
737 } else {
738 self.now()
739 };
740 let now = self.now();
741 let execution_uuid = epoch_to_uuid_v7(&now);
742 self.record_statement_lifecycle_event(
743 &StatementLoggingId(execution_uuid),
744 &StatementLifecycleEvent::ExecutionBegan,
745 began_at,
746 );
747
748 let params = std::iter::zip(params.execute_types.iter(), params.datums.iter())
749 .map(|(r#type, datum)| {
750 mz_pgrepr::Value::from_datum(datum, r#type).map(|val| {
751 let mut buf = BytesMut::new();
752 val.encode_text(&mut buf);
753 String::from_utf8(Into::<Vec<u8>>::into(buf))
754 .expect("Serialization shouldn't produce non-UTF-8 strings.")
755 })
756 })
757 .collect();
758 let record = StatementBeganExecutionRecord {
759 id: execution_uuid,
760 prepared_statement_id: ps_uuid,
761 sample_rate,
762 params,
763 began_at,
764 application_name: session.application_name().to_string(),
765 transaction_isolation: session.vars().transaction_isolation().to_string(),
766 transaction_id: session
767 .transaction()
768 .inner()
769 .expect("Every statement runs in an explicit or implicit transaction")
770 .id,
771 mz_version: self
772 .catalog()
773 .state()
774 .config()
775 .build_info
776 .human_version(None),
777 cluster_id: None,
779 cluster_name: None,
780 execution_timestamp: None,
781 transient_index_id: None,
782 database_name: session.vars().database().into(),
783 search_path: session
784 .vars()
785 .search_path()
786 .iter()
787 .map(|s| s.as_str().to_string())
788 .collect(),
789 };
790 let mseh_update = Self::pack_statement_began_execution_update(&record);
791 self.statement_logging
792 .pending_statement_execution_events
793 .push((mseh_update, Diff::ONE));
794 self.statement_logging
795 .executions_begun
796 .insert(execution_uuid, record);
797 if let Some((ps_record, ps_update)) = ps_record {
798 self.statement_logging
799 .pending_prepared_statement_events
800 .push(ps_update);
801 if let Some(sh) = self
802 .statement_logging
803 .unlogged_sessions
804 .remove(&ps_record.session_id)
805 {
806 let sh_update = Self::pack_session_history_update(&sh);
807 self.statement_logging
808 .pending_session_events
809 .push(sh_update);
810 }
811 }
812 Some(StatementLoggingId(execution_uuid))
813 }
814
815 pub fn begin_session_for_statement_logging(&mut self, session: &ConnMeta) {
817 let id = session.uuid();
818 let session_role = session.authenticated_role_id();
819 let event = SessionHistoryEvent {
820 id,
821 connected_at: session.connected_at(),
822 application_name: session.application_name().to_owned(),
823 authenticated_user: self.catalog.get_role(session_role).name.clone(),
824 };
825 self.statement_logging.unlogged_sessions.insert(id, event);
826 }
827
828 pub fn end_session_for_statement_logging(&mut self, uuid: Uuid) {
829 self.statement_logging.unlogged_sessions.remove(&uuid);
830 }
831
832 pub fn record_statement_lifecycle_event(
833 &mut self,
834 id: &StatementLoggingId,
835 event: &StatementLifecycleEvent,
836 when: EpochMillis,
837 ) {
838 if mz_adapter_types::dyncfgs::ENABLE_STATEMENT_LIFECYCLE_LOGGING
839 .get(self.catalog().system_config().dyncfgs())
840 {
841 let row = Self::pack_statement_lifecycle_event(id, event, when);
842 self.statement_logging
843 .pending_statement_lifecycle_events
844 .push(row);
845 }
846 }
847}
848
849mod sealed {
850 #[derive(Debug, Copy, Clone)]
853 pub struct Private;
854}