1use std::collections::BTreeMap;
11use std::sync::Arc;
12
13use bytes::BytesMut;
14use mz_controller_types::ClusterId;
15use mz_ore::now::{NowFn, epoch_to_uuid_v7, to_datetime};
16use mz_ore::task::spawn;
17use mz_ore::{cast::CastFrom, cast::CastInto, now::EpochMillis};
18use mz_repr::adt::array::ArrayDimension;
19use mz_repr::adt::timestamp::TimestampLike;
20use mz_repr::{Datum, Diff, GlobalId, Row, RowPacker, Timestamp};
21use mz_sql::ast::display::AstDisplay;
22use mz_sql::ast::{AstInfo, Statement};
23use mz_sql::plan::Params;
24use mz_sql::session::metadata::SessionMetadata;
25use mz_sql_parser::ast::{StatementKind, statement_kind_label_value};
26use mz_storage_client::controller::IntrospectionType;
27use qcell::QCell;
28use rand::SeedableRng;
29use rand::{distributions::Bernoulli, prelude::Distribution, thread_rng};
30use sha2::{Digest, Sha256};
31use tokio::time::MissedTickBehavior;
32use tracing::debug;
33use uuid::Uuid;
34
35use crate::coord::{ConnMeta, Coordinator};
36use crate::session::Session;
37use crate::statement_logging::{
38 SessionHistoryEvent, StatementBeganExecutionRecord, StatementEndedExecutionReason,
39 StatementEndedExecutionRecord, StatementLifecycleEvent, StatementPreparedRecord,
40};
41
42use super::Message;
43
44#[derive(Debug)]
46pub enum PreparedStatementLoggingInfo {
47 AlreadyLogged { uuid: Uuid },
51 StillToLog {
54 sql: String,
56 redacted_sql: String,
59 prepared_at: EpochMillis,
61 name: String,
63 session_id: Uuid,
65 accounted: bool,
67 kind: Option<StatementKind>,
69
70 _sealed: sealed::Private,
73 },
74}
75
76impl PreparedStatementLoggingInfo {
77 pub fn still_to_log<A: AstInfo>(
80 raw_sql: String,
81 stmt: Option<&Statement<A>>,
82 prepared_at: EpochMillis,
83 name: String,
84 session_id: Uuid,
85 accounted: bool,
86 ) -> Self {
87 let kind = stmt.map(StatementKind::from);
88 let sql = match kind {
89 Some(StatementKind::CreateSecret | StatementKind::AlterSecret) => {
91 stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default()
92 }
93 _ => raw_sql,
94 };
95
96 PreparedStatementLoggingInfo::StillToLog {
97 sql,
98 redacted_sql: stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default(),
99 prepared_at,
100 name,
101 session_id,
102 accounted,
103 kind,
104 _sealed: sealed::Private,
105 }
106 }
107}
108
109#[derive(Copy, Clone, Debug, Ord, Eq, PartialOrd, PartialEq)]
110pub struct StatementLoggingId(Uuid);
111
112#[derive(Debug)]
113pub(crate) struct PreparedStatementEvent {
114 prepared_statement: Row,
115 sql_text: Row,
116}
117
118#[derive(Debug)]
119pub(crate) struct StatementLogging {
120 executions_begun: BTreeMap<Uuid, StatementBeganExecutionRecord>,
127
128 unlogged_sessions: BTreeMap<Uuid, SessionHistoryEvent>,
132
133 reproducible_rng: rand_chacha::ChaCha8Rng,
137
138 pending_statement_execution_events: Vec<(Row, Diff)>,
139 pending_prepared_statement_events: Vec<PreparedStatementEvent>,
140 pending_session_events: Vec<Row>,
141 pending_statement_lifecycle_events: Vec<Row>,
142
143 now: NowFn,
144
145 tokens: u64,
149 last_logged_ts_seconds: u64,
151 throttled_count: usize,
153}
154
155impl StatementLogging {
156 pub(crate) fn new(now: NowFn) -> Self {
157 let last_logged_ts_seconds = (now)() / 1000;
158 Self {
159 executions_begun: BTreeMap::new(),
160 unlogged_sessions: BTreeMap::new(),
161 reproducible_rng: rand_chacha::ChaCha8Rng::seed_from_u64(42),
162 pending_statement_execution_events: Vec::new(),
163 pending_prepared_statement_events: Vec::new(),
164 pending_session_events: Vec::new(),
165 pending_statement_lifecycle_events: Vec::new(),
166 tokens: 0,
167 last_logged_ts_seconds,
168 now: now.clone(),
169 throttled_count: 0,
170 }
171 }
172
173 fn throttling_check(
179 &mut self,
180 cost: u64,
181 target_data_rate: u64,
182 max_data_credit: Option<u64>,
183 ) -> Option<usize> {
184 let ts = (self.now)() / 1000;
185 let elapsed = ts - self.last_logged_ts_seconds;
186 self.last_logged_ts_seconds = ts;
187 self.tokens = self
188 .tokens
189 .saturating_add(target_data_rate.saturating_mul(elapsed));
190 if let Some(max_data_credit) = max_data_credit {
191 self.tokens = self.tokens.min(max_data_credit);
192 }
193 if let Some(remaining) = self.tokens.checked_sub(cost) {
194 debug!("throttling check passed. tokens remaining: {remaining}; cost: {cost}");
195 self.tokens = remaining;
196 Some(std::mem::take(&mut self.throttled_count))
197 } else {
198 debug!(
199 "throttling check failed. tokens available: {}; cost: {cost}",
200 self.tokens
201 );
202 self.throttled_count += 1;
203 None
204 }
205 }
206}
207
208impl Coordinator {
209 pub(crate) fn spawn_statement_logging_task(&self) {
210 let internal_cmd_tx = self.internal_cmd_tx.clone();
211 spawn(|| "statement_logging", async move {
212 let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
217 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
218 loop {
219 interval.tick().await;
220 let _ = internal_cmd_tx.send(Message::DrainStatementLog);
221 }
222 });
223 }
224
225 #[mz_ore::instrument(level = "debug")]
226 pub(crate) fn drain_statement_log(&mut self) {
227 let session_updates = std::mem::take(&mut self.statement_logging.pending_session_events)
228 .into_iter()
229 .map(|update| (update, Diff::ONE))
230 .collect();
231 let (prepared_statement_updates, sql_text_updates) =
232 std::mem::take(&mut self.statement_logging.pending_prepared_statement_events)
233 .into_iter()
234 .map(
235 |PreparedStatementEvent {
236 prepared_statement,
237 sql_text,
238 }| {
239 ((prepared_statement, Diff::ONE), (sql_text, Diff::ONE))
240 },
241 )
242 .unzip::<_, _, Vec<_>, Vec<_>>();
243 let statement_execution_updates =
244 std::mem::take(&mut self.statement_logging.pending_statement_execution_events);
245 let statement_lifecycle_updates =
246 std::mem::take(&mut self.statement_logging.pending_statement_lifecycle_events)
247 .into_iter()
248 .map(|update| (update, Diff::ONE))
249 .collect();
250
251 use IntrospectionType::*;
252 for (type_, updates) in [
253 (SessionHistory, session_updates),
254 (PreparedStatementHistory, prepared_statement_updates),
255 (StatementExecutionHistory, statement_execution_updates),
256 (StatementLifecycleHistory, statement_lifecycle_updates),
257 (SqlText, sql_text_updates),
258 ] {
259 if !updates.is_empty() && !self.controller.read_only() {
260 self.controller
261 .storage
262 .append_introspection_updates(type_, updates);
263 }
264 }
265 }
266
267 fn statement_logging_throttling_check(&mut self, cost: usize) -> Option<usize> {
273 let Some(target_data_rate) = self
274 .catalog
275 .system_config()
276 .statement_logging_target_data_rate()
277 else {
278 return Some(std::mem::take(&mut self.statement_logging.throttled_count));
279 };
280 let max_data_credit = self
281 .catalog
282 .system_config()
283 .statement_logging_max_data_credit();
284 self.statement_logging.throttling_check(
285 cost.cast_into(),
286 target_data_rate.cast_into(),
287 max_data_credit.map(CastInto::cast_into),
288 )
289 }
290
291 pub(crate) fn log_prepared_statement(
298 &mut self,
299 session: &mut Session,
300 logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
301 ) -> Option<(
302 Option<(StatementPreparedRecord, PreparedStatementEvent)>,
303 Uuid,
304 )> {
305 let logging = session.qcell_rw(&*logging);
306 let mut out = None;
307
308 let uuid = match logging {
309 PreparedStatementLoggingInfo::AlreadyLogged { uuid } => *uuid,
310 PreparedStatementLoggingInfo::StillToLog {
311 sql,
312 redacted_sql,
313 prepared_at,
314 name,
315 session_id,
316 accounted,
317 kind,
318 _sealed: _,
319 } => {
320 assert!(
321 *accounted,
322 "accounting for logging should be done in `begin_statement_execution`"
323 );
324 let uuid = epoch_to_uuid_v7(prepared_at);
325 let sql = std::mem::take(sql);
326 let redacted_sql = std::mem::take(redacted_sql);
327 let sql_hash: [u8; 32] = Sha256::digest(sql.as_bytes()).into();
328 let record = StatementPreparedRecord {
329 id: uuid,
330 sql_hash,
331 name: std::mem::take(name),
332 session_id: *session_id,
333 prepared_at: *prepared_at,
334 kind: *kind,
335 };
336 let mut mpsh_row = Row::default();
337 let mut mpsh_packer = mpsh_row.packer();
338 Self::pack_statement_prepared_update(&record, &mut mpsh_packer);
339 let sql_row = Row::pack([
340 Datum::TimestampTz(
341 to_datetime(*prepared_at)
342 .truncate_day()
343 .try_into()
344 .expect("must fit"),
345 ),
346 Datum::Bytes(sql_hash.as_slice()),
347 Datum::String(sql.as_str()),
348 Datum::String(redacted_sql.as_str()),
349 ]);
350
351 let cost = mpsh_packer.byte_len() + sql_row.byte_len();
352 let throttled_count = self.statement_logging_throttling_check(cost)?;
353 mpsh_packer.push(Datum::UInt64(throttled_count.try_into().expect("must fit")));
354 out = Some((
355 record,
356 PreparedStatementEvent {
357 prepared_statement: mpsh_row,
358 sql_text: sql_row,
359 },
360 ));
361
362 *logging = PreparedStatementLoggingInfo::AlreadyLogged { uuid };
363 uuid
364 }
365 };
366 Some((out, uuid))
367 }
368 pub fn statement_execution_sample_rate(&self, session: &Session) -> f64 {
372 let system: f64 = self
373 .catalog()
374 .system_config()
375 .statement_logging_max_sample_rate()
376 .try_into()
377 .expect("value constrained to be convertible to f64");
378 let user: f64 = session
379 .vars()
380 .get_statement_logging_sample_rate()
381 .try_into()
382 .expect("value constrained to be convertible to f64");
383 f64::min(system, user)
384 }
385
386 pub fn end_statement_execution(
392 &mut self,
393 id: StatementLoggingId,
394 reason: StatementEndedExecutionReason,
395 ) {
396 let StatementLoggingId(uuid) = id;
397 let now = self.now();
398 let ended_record = StatementEndedExecutionRecord {
399 id: uuid,
400 reason,
401 ended_at: now,
402 };
403
404 let began_record = self
405 .statement_logging
406 .executions_begun
407 .remove(&uuid)
408 .expect(
409 "matched `begin_statement_execution` and `end_statement_execution` invocations",
410 );
411 for (row, diff) in
412 Self::pack_statement_ended_execution_updates(&began_record, &ended_record)
413 {
414 self.statement_logging
415 .pending_statement_execution_events
416 .push((row, diff));
417 }
418 self.record_statement_lifecycle_event(
419 &id,
420 &StatementLifecycleEvent::ExecutionFinished,
421 now,
422 );
423 }
424
425 fn pack_statement_execution_inner(
426 record: &StatementBeganExecutionRecord,
427 packer: &mut RowPacker,
428 ) {
429 let StatementBeganExecutionRecord {
430 id,
431 prepared_statement_id,
432 sample_rate,
433 params,
434 began_at,
435 cluster_id,
436 cluster_name,
437 database_name,
438 search_path,
439 application_name,
440 transaction_isolation,
441 execution_timestamp,
442 transaction_id,
443 transient_index_id,
444 mz_version,
445 } = record;
446
447 let cluster = cluster_id.map(|id| id.to_string());
448 let transient_index_id = transient_index_id.map(|id| id.to_string());
449 packer.extend([
450 Datum::Uuid(*id),
451 Datum::Uuid(*prepared_statement_id),
452 Datum::Float64((*sample_rate).into()),
453 match &cluster {
454 None => Datum::Null,
455 Some(cluster_id) => Datum::String(cluster_id),
456 },
457 Datum::String(&*application_name),
458 cluster_name.as_ref().map(String::as_str).into(),
459 Datum::String(database_name),
460 ]);
461 packer.push_list(search_path.iter().map(|s| Datum::String(s)));
462 packer.extend([
463 Datum::String(&*transaction_isolation),
464 (*execution_timestamp).into(),
465 Datum::UInt64(*transaction_id),
466 match &transient_index_id {
467 None => Datum::Null,
468 Some(transient_index_id) => Datum::String(transient_index_id),
469 },
470 ]);
471 packer
472 .try_push_array(
473 &[ArrayDimension {
474 lower_bound: 1,
475 length: params.len(),
476 }],
477 params
478 .iter()
479 .map(|p| Datum::from(p.as_ref().map(String::as_str))),
480 )
481 .expect("correct array dimensions");
482 packer.push(Datum::from(mz_version.as_str()));
483 packer.push(Datum::TimestampTz(
484 to_datetime(*began_at).try_into().expect("Sane system time"),
485 ));
486 }
487
488 fn pack_statement_began_execution_update(record: &StatementBeganExecutionRecord) -> Row {
489 let mut row = Row::default();
490 let mut packer = row.packer();
491 Self::pack_statement_execution_inner(record, &mut packer);
492 packer.extend([
493 Datum::Null,
495 Datum::Null,
497 Datum::Null,
499 Datum::Null,
501 Datum::Null,
503 Datum::Null,
505 ]);
506 row
507 }
508
509 fn pack_statement_prepared_update(record: &StatementPreparedRecord, packer: &mut RowPacker) {
510 let StatementPreparedRecord {
511 id,
512 session_id,
513 name,
514 sql_hash,
515 prepared_at,
516 kind,
517 } = record;
518 packer.extend([
519 Datum::Uuid(*id),
520 Datum::Uuid(*session_id),
521 Datum::String(name.as_str()),
522 Datum::Bytes(sql_hash.as_slice()),
523 Datum::TimestampTz(to_datetime(*prepared_at).try_into().expect("must fit")),
524 kind.map(statement_kind_label_value).into(),
525 ]);
526 }
527
528 fn pack_session_history_update(event: &SessionHistoryEvent) -> Row {
529 let SessionHistoryEvent {
530 id,
531 connected_at,
532 application_name,
533 authenticated_user,
534 } = event;
535 Row::pack_slice(&[
536 Datum::Uuid(*id),
537 Datum::TimestampTz(
538 mz_ore::now::to_datetime(*connected_at)
539 .try_into()
540 .expect("must fit"),
541 ),
542 Datum::String(&*application_name),
543 Datum::String(&*authenticated_user),
544 ])
545 }
546
547 fn pack_statement_lifecycle_event(
548 StatementLoggingId(uuid): &StatementLoggingId,
549 event: &StatementLifecycleEvent,
550 when: EpochMillis,
551 ) -> Row {
552 Row::pack_slice(&[
553 Datum::Uuid(*uuid),
554 Datum::String(event.as_str()),
555 Datum::TimestampTz(mz_ore::now::to_datetime(when).try_into().expect("must fit")),
556 ])
557 }
558
559 pub fn pack_full_statement_execution_update(
560 began_record: &StatementBeganExecutionRecord,
561 ended_record: &StatementEndedExecutionRecord,
562 ) -> Row {
563 let mut row = Row::default();
564 let mut packer = row.packer();
565 Self::pack_statement_execution_inner(began_record, &mut packer);
566 let (status, error_message, result_size, rows_returned, execution_strategy) =
567 match &ended_record.reason {
568 StatementEndedExecutionReason::Success {
569 result_size,
570 rows_returned,
571 execution_strategy,
572 } => (
573 "success",
574 None,
575 result_size.map(|rs| i64::try_from(rs).expect("must fit")),
576 rows_returned.map(|rr| i64::try_from(rr).expect("must fit")),
577 execution_strategy.map(|es| es.name()),
578 ),
579 StatementEndedExecutionReason::Canceled => ("canceled", None, None, None, None),
580 StatementEndedExecutionReason::Errored { error } => {
581 ("error", Some(error.as_str()), None, None, None)
582 }
583 StatementEndedExecutionReason::Aborted => ("aborted", None, None, None, None),
584 };
585 packer.extend([
586 Datum::TimestampTz(
587 to_datetime(ended_record.ended_at)
588 .try_into()
589 .expect("Sane system time"),
590 ),
591 status.into(),
592 error_message.into(),
593 result_size.into(),
594 rows_returned.into(),
595 execution_strategy.into(),
596 ]);
597 row
598 }
599
600 pub fn pack_statement_ended_execution_updates(
601 began_record: &StatementBeganExecutionRecord,
602 ended_record: &StatementEndedExecutionRecord,
603 ) -> [(Row, Diff); 2] {
604 let retraction = Self::pack_statement_began_execution_update(began_record);
605 let new = Self::pack_full_statement_execution_update(began_record, ended_record);
606 [(retraction, Diff::MINUS_ONE), (new, Diff::ONE)]
607 }
608
609 fn mutate_record<F: FnOnce(&mut StatementBeganExecutionRecord)>(
611 &mut self,
612 StatementLoggingId(id): StatementLoggingId,
613 f: F,
614 ) {
615 let record = self
616 .statement_logging
617 .executions_begun
618 .get_mut(&id)
619 .expect("mutate_record must not be called after execution ends");
620 let retraction = Self::pack_statement_began_execution_update(record);
621 self.statement_logging
622 .pending_statement_execution_events
623 .push((retraction, Diff::MINUS_ONE));
624 f(record);
625 let update = Self::pack_statement_began_execution_update(record);
626 self.statement_logging
627 .pending_statement_execution_events
628 .push((update, Diff::ONE));
629 }
630
631 pub fn set_statement_execution_cluster(
633 &mut self,
634 id: StatementLoggingId,
635 cluster_id: ClusterId,
636 ) {
637 let cluster_name = self.catalog().get_cluster(cluster_id).name.clone();
638 self.mutate_record(id, |record| {
639 record.cluster_name = Some(cluster_name);
640 record.cluster_id = Some(cluster_id);
641 });
642 }
643
644 pub fn set_statement_execution_timestamp(
646 &mut self,
647 id: StatementLoggingId,
648 timestamp: Timestamp,
649 ) {
650 self.mutate_record(id, |record| {
651 record.execution_timestamp = Some(u64::from(timestamp));
652 });
653 }
654
655 pub fn set_transient_index_id(&mut self, id: StatementLoggingId, transient_index_id: GlobalId) {
656 self.mutate_record(id, |record| {
657 record.transient_index_id = Some(transient_index_id)
658 });
659 }
660
661 pub fn begin_statement_execution(
665 &mut self,
666 session: &mut Session,
667 params: &Params,
668 logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
669 ) -> Option<StatementLoggingId> {
670 let enable_internal_statement_logging = self
671 .catalog()
672 .system_config()
673 .enable_internal_statement_logging();
674 if session.user().is_internal() && !enable_internal_statement_logging {
675 return None;
676 }
677 let sample_rate = self.statement_execution_sample_rate(session);
678
679 let distribution = Bernoulli::new(sample_rate).expect("rate must be in range [0, 1]");
680 let sample = if self
681 .catalog()
682 .system_config()
683 .statement_logging_use_reproducible_rng()
684 {
685 distribution.sample(&mut self.statement_logging.reproducible_rng)
686 } else {
687 distribution.sample(&mut thread_rng())
688 };
689
690 let sampled_label = sample.then_some("true").unwrap_or("false");
692 self.metrics
693 .statement_logging_records
694 .with_label_values(&[sampled_label])
695 .inc_by(1);
696
697 if let Some((sql, accounted)) = match session.qcell_rw(logging) {
698 PreparedStatementLoggingInfo::AlreadyLogged { .. } => None,
699 PreparedStatementLoggingInfo::StillToLog { sql, accounted, .. } => {
700 Some((sql, accounted))
701 }
702 } {
703 if !*accounted {
704 self.metrics
705 .statement_logging_unsampled_bytes
706 .with_label_values(&[])
707 .inc_by(u64::cast_from(sql.len()));
708 if sample {
709 self.metrics
710 .statement_logging_actual_bytes
711 .with_label_values(&[])
712 .inc_by(u64::cast_from(sql.len()));
713 }
714 *accounted = true;
715 }
716 }
717 if !sample {
718 return None;
719 }
720 let (ps_record, ps_uuid) = self.log_prepared_statement(session, logging)?;
721
722 let uuid = match session.qcell_rw(logging) {
723 PreparedStatementLoggingInfo::AlreadyLogged { uuid } => *uuid,
724 PreparedStatementLoggingInfo::StillToLog { prepared_at, .. } => {
725 epoch_to_uuid_v7(prepared_at)
726 }
727 };
728
729 let now = self.now();
730 self.record_statement_lifecycle_event(
731 &StatementLoggingId(uuid),
732 &StatementLifecycleEvent::ExecutionBegan,
733 now,
734 );
735
736 let params = std::iter::zip(params.types.iter(), params.datums.iter())
737 .map(|(r#type, datum)| {
738 mz_pgrepr::Value::from_datum(datum, r#type).map(|val| {
739 let mut buf = BytesMut::new();
740 val.encode_text(&mut buf);
741 String::from_utf8(Into::<Vec<u8>>::into(buf))
742 .expect("Serialization shouldn't produce non-UTF-8 strings.")
743 })
744 })
745 .collect();
746 let record = StatementBeganExecutionRecord {
747 id: uuid,
748 prepared_statement_id: ps_uuid,
749 sample_rate,
750 params,
751 began_at: self.now(),
752 application_name: session.application_name().to_string(),
753 transaction_isolation: session.vars().transaction_isolation().to_string(),
754 transaction_id: session
755 .transaction()
756 .inner()
757 .expect("Every statement runs in an explicit or implicit transaction")
758 .id,
759 mz_version: self
760 .catalog()
761 .state()
762 .config()
763 .build_info
764 .human_version(None),
765 cluster_id: None,
767 cluster_name: None,
768 execution_timestamp: None,
769 transient_index_id: None,
770 database_name: session.vars().database().into(),
771 search_path: session
772 .vars()
773 .search_path()
774 .iter()
775 .map(|s| s.as_str().to_string())
776 .collect(),
777 };
778 let mseh_update = Self::pack_statement_began_execution_update(&record);
779 self.statement_logging
780 .pending_statement_execution_events
781 .push((mseh_update, Diff::ONE));
782 self.statement_logging.executions_begun.insert(uuid, record);
783 if let Some((ps_record, ps_update)) = ps_record {
784 self.statement_logging
785 .pending_prepared_statement_events
786 .push(ps_update);
787 if let Some(sh) = self
788 .statement_logging
789 .unlogged_sessions
790 .remove(&ps_record.session_id)
791 {
792 let sh_update = Self::pack_session_history_update(&sh);
793 self.statement_logging
794 .pending_session_events
795 .push(sh_update);
796 }
797 }
798 Some(StatementLoggingId(uuid))
799 }
800
801 pub fn begin_session_for_statement_logging(&mut self, session: &ConnMeta) {
803 let id = session.uuid();
804 let session_role = session.authenticated_role_id();
805 let event = SessionHistoryEvent {
806 id,
807 connected_at: session.connected_at(),
808 application_name: session.application_name().to_owned(),
809 authenticated_user: self.catalog.get_role(session_role).name.clone(),
810 };
811 self.statement_logging.unlogged_sessions.insert(id, event);
812 }
813
814 pub fn end_session_for_statement_logging(&mut self, uuid: Uuid) {
815 self.statement_logging.unlogged_sessions.remove(&uuid);
816 }
817
818 pub fn record_statement_lifecycle_event(
819 &mut self,
820 id: &StatementLoggingId,
821 event: &StatementLifecycleEvent,
822 when: EpochMillis,
823 ) {
824 if mz_adapter_types::dyncfgs::ENABLE_STATEMENT_LIFECYCLE_LOGGING
825 .get(self.catalog().system_config().dyncfgs())
826 {
827 let row = Self::pack_statement_lifecycle_event(id, event, when);
828 self.statement_logging
829 .pending_statement_lifecycle_events
830 .push(row);
831 }
832 }
833}
834
835mod sealed {
836 #[derive(Debug, Copy, Clone)]
839 pub struct Private;
840}