1use std::collections::BTreeMap;
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13
14use mz_adapter_types::connection::ConnectionId;
15use mz_compute_client::controller::error::CollectionLookupError;
16use mz_controller_types::ClusterId;
17use mz_ore::now::{EpochMillis, NowFn, epoch_to_uuid_v7, to_datetime};
18use mz_ore::task::spawn;
19use mz_ore::{cast::CastFrom, cast::CastInto};
20use mz_repr::adt::timestamp::TimestampLike;
21use mz_repr::{Datum, Diff, GlobalId, Row, Timestamp};
22use mz_sql::plan::Params;
23use mz_sql::session::metadata::SessionMetadata;
24use mz_storage_client::controller::IntrospectionType;
25use qcell::QCell;
26use rand::SeedableRng;
27use sha2::{Digest, Sha256};
28use tokio::time::MissedTickBehavior;
29use uuid::Uuid;
30
31use crate::coord::{ConnMeta, Coordinator, WatchSetResponse};
32use crate::session::{LifecycleTimestamps, Session};
33use crate::statement_logging::{
34 FrontendStatementLoggingEvent, PreparedStatementEvent, PreparedStatementLoggingInfo,
35 SessionHistoryEvent, StatementBeganExecutionRecord, StatementEndedExecutionReason,
36 StatementEndedExecutionRecord, StatementLifecycleEvent, StatementLoggingFrontend,
37 StatementLoggingId, StatementPreparedRecord, ThrottlingState, WatchSetCreation,
38 create_began_execution_record, effective_sample_rate, pack_statement_began_execution_update,
39 pack_statement_execution_inner, pack_statement_prepared_update, should_sample_statement,
40};
41
42use super::Message;
43
44#[derive(Debug)]
46pub(crate) struct StatementLogging {
47 executions_begun: BTreeMap<Uuid, StatementBeganExecutionRecord>,
54
55 unlogged_sessions: BTreeMap<Uuid, SessionHistoryEvent>,
59
60 reproducible_rng: Arc<Mutex<rand_chacha::ChaCha8Rng>>,
65
66 pending_statement_execution_events: Vec<(Row, Diff)>,
68 pending_prepared_statement_events: Vec<PreparedStatementEvent>,
69 pending_session_events: Vec<Row>,
70 pending_statement_lifecycle_events: Vec<Row>,
71
72 pub(crate) throttling_state: Arc<ThrottlingState>,
74
75 pub(crate) now: NowFn,
77}
78
79impl StatementLogging {
80 const REPRODUCIBLE_RNG_SEED: u64 = 42;
81
82 pub(crate) fn new(now: NowFn) -> Self {
83 Self {
84 executions_begun: BTreeMap::new(),
85 unlogged_sessions: BTreeMap::new(),
86 reproducible_rng: Arc::new(Mutex::new(rand_chacha::ChaCha8Rng::seed_from_u64(
87 Self::REPRODUCIBLE_RNG_SEED,
88 ))),
89 pending_statement_execution_events: Vec::new(),
90 pending_prepared_statement_events: Vec::new(),
91 pending_session_events: Vec::new(),
92 pending_statement_lifecycle_events: Vec::new(),
93 throttling_state: Arc::new(ThrottlingState::new(&now)),
94 now,
95 }
96 }
97
98 pub(crate) fn create_frontend(
103 &self,
104 build_info_human_version: String,
105 ) -> StatementLoggingFrontend {
106 StatementLoggingFrontend {
107 throttling_state: Arc::clone(&self.throttling_state),
108 reproducible_rng: Arc::clone(&self.reproducible_rng),
109 build_info_human_version,
110 now: self.now.clone(),
111 }
112 }
113}
114
115impl Coordinator {
116 fn write_began_execution_events(
119 &mut self,
120 record: StatementBeganExecutionRecord,
121 mseh_update: Row,
122 prepared_statement: Option<PreparedStatementEvent>,
123 ) {
124 self.statement_logging
126 .pending_statement_execution_events
127 .push((mseh_update, Diff::ONE));
128
129 self.statement_logging
131 .executions_begun
132 .insert(record.id, record);
133
134 if let Some(ps_event) = prepared_statement {
136 let session_id = ps_event.session_id;
137 self.statement_logging
138 .pending_prepared_statement_events
139 .push(ps_event);
140
141 if let Some(sh) = self.statement_logging.unlogged_sessions.remove(&session_id) {
143 let sh_update = Self::pack_session_history_update(&sh);
144 self.statement_logging
145 .pending_session_events
146 .push(sh_update);
147 }
148 }
149 }
150
151 pub(crate) fn handle_frontend_statement_logging_event(
153 &mut self,
154 event: FrontendStatementLoggingEvent,
155 ) {
156 match event {
157 FrontendStatementLoggingEvent::BeganExecution {
158 record,
159 mseh_update,
160 prepared_statement,
161 } => {
162 self.record_statement_lifecycle_event(
163 &StatementLoggingId(record.id),
164 &StatementLifecycleEvent::ExecutionBegan,
165 record.began_at,
166 );
167 self.write_began_execution_events(record, mseh_update, prepared_statement);
168 }
169 FrontendStatementLoggingEvent::EndedExecution(ended_record) => {
170 self.end_statement_execution(
171 StatementLoggingId(ended_record.id),
172 ended_record.reason,
173 );
174 }
175 FrontendStatementLoggingEvent::SetCluster { id, cluster_id } => {
176 self.set_statement_execution_cluster(id, cluster_id);
177 }
178 FrontendStatementLoggingEvent::SetTimestamp { id, timestamp } => {
179 self.set_statement_execution_timestamp(id, timestamp);
180 }
181 FrontendStatementLoggingEvent::SetTransientIndex {
182 id,
183 transient_index_id,
184 } => {
185 self.set_transient_index_id(id, transient_index_id);
186 }
187 FrontendStatementLoggingEvent::Lifecycle { id, event, when } => {
188 self.record_statement_lifecycle_event(&id, &event, when);
189 }
190 }
191 }
192
193 const STATEMENT_LOGGING_WRITE_INTERVAL: Duration = Duration::from_secs(5);
198
199 pub(crate) fn spawn_statement_logging_task(&self) {
200 let internal_cmd_tx = self.internal_cmd_tx.clone();
201 spawn(|| "statement_logging", async move {
202 let mut interval = tokio::time::interval(Coordinator::STATEMENT_LOGGING_WRITE_INTERVAL);
203 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
204 loop {
205 interval.tick().await;
206 let _ = internal_cmd_tx.send(Message::DrainStatementLog);
207 }
208 });
209 }
210
211 #[mz_ore::instrument(level = "debug")]
212 pub(crate) fn drain_statement_log(&mut self) {
213 let session_updates = std::mem::take(&mut self.statement_logging.pending_session_events)
214 .into_iter()
215 .map(|update| (update, Diff::ONE))
216 .collect();
217 let (prepared_statement_updates, sql_text_updates) =
218 std::mem::take(&mut self.statement_logging.pending_prepared_statement_events)
219 .into_iter()
220 .map(
221 |PreparedStatementEvent {
222 prepared_statement,
223 sql_text,
224 ..
225 }| {
226 ((prepared_statement, Diff::ONE), (sql_text, Diff::ONE))
227 },
228 )
229 .unzip::<_, _, Vec<_>, Vec<_>>();
230 let statement_execution_updates =
231 std::mem::take(&mut self.statement_logging.pending_statement_execution_events);
232 let statement_lifecycle_updates =
233 std::mem::take(&mut self.statement_logging.pending_statement_lifecycle_events)
234 .into_iter()
235 .map(|update| (update, Diff::ONE))
236 .collect();
237
238 use IntrospectionType::*;
239 for (type_, updates) in [
240 (SessionHistory, session_updates),
241 (PreparedStatementHistory, prepared_statement_updates),
242 (StatementExecutionHistory, statement_execution_updates),
243 (StatementLifecycleHistory, statement_lifecycle_updates),
244 (SqlText, sql_text_updates),
245 ] {
246 if !updates.is_empty() && !self.controller.read_only() {
247 self.controller
248 .storage
249 .append_introspection_updates(type_, updates);
250 }
251 }
252 }
253
254 fn statement_logging_throttling_check<'a, I>(&self, rows: I) -> bool
262 where
263 I: IntoIterator<Item = Option<&'a Row>>,
264 {
265 let cost = rows
266 .into_iter()
267 .filter_map(|row_opt| row_opt.map(|row| row.byte_len()))
268 .fold(0_usize, |acc, x| acc.saturating_add(x));
269
270 let Some(target_data_rate) = self
271 .catalog
272 .system_config()
273 .statement_logging_target_data_rate()
274 else {
275 return true;
276 };
277 let max_data_credit = self
278 .catalog
279 .system_config()
280 .statement_logging_max_data_credit();
281
282 self.statement_logging.throttling_state.throttling_check(
283 cost.cast_into(),
284 target_data_rate.cast_into(),
285 max_data_credit.map(CastInto::cast_into),
286 &self.statement_logging.now,
287 )
288 }
289
290 fn record_prepared_statement_as_logged(
293 &self,
294 uuid: Uuid,
295 session: &mut Session,
296 logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
297 ) {
298 let logging = session.qcell_rw(&*logging);
299 if let PreparedStatementLoggingInfo::StillToLog { .. } = logging {
300 *logging = PreparedStatementLoggingInfo::AlreadyLogged { uuid };
301 }
302 }
303
304 pub(crate) fn get_prepared_statement_info(
316 &self,
317 session: &Session,
318 logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
319 ) -> (
320 Option<(StatementPreparedRecord, PreparedStatementEvent)>,
321 Uuid,
322 ) {
323 let logging = session.qcell_ro(&*logging);
324
325 match logging {
326 PreparedStatementLoggingInfo::AlreadyLogged { uuid } => (None, *uuid),
327 PreparedStatementLoggingInfo::StillToLog {
328 sql,
329 redacted_sql,
330 prepared_at,
331 name,
332 session_id,
333 accounted,
334 kind,
335 _sealed: _,
336 } => {
337 assert!(
338 *accounted,
339 "accounting for logging should be done in `begin_statement_execution`"
340 );
341 let uuid = epoch_to_uuid_v7(prepared_at);
342 let sql_hash: [u8; 32] = Sha256::digest(sql.as_bytes()).into();
343 let record = StatementPreparedRecord {
344 id: uuid,
345 sql_hash,
346 name: name.to_string(),
347 session_id: *session_id,
348 prepared_at: *prepared_at,
349 kind: *kind,
350 };
351
352 let mut mpsh_row = Row::default();
354 let mut mpsh_packer = mpsh_row.packer();
355 pack_statement_prepared_update(&record, &mut mpsh_packer);
356 let throttled_count = self
357 .statement_logging
358 .throttling_state
359 .get_throttled_count();
360 mpsh_packer.push(Datum::UInt64(CastFrom::cast_from(throttled_count)));
361
362 let sql_row = Row::pack([
363 Datum::TimestampTz(
364 to_datetime(*prepared_at)
365 .truncate_day()
366 .try_into()
367 .expect("must fit"),
368 ),
369 Datum::Bytes(sql_hash.as_slice()),
370 Datum::String(sql.as_str()),
371 Datum::String(redacted_sql.as_str()),
372 ]);
373
374 (
375 Some((
376 record,
377 PreparedStatementEvent {
378 prepared_statement: mpsh_row,
379 sql_text: sql_row,
380 session_id: *session_id,
381 },
382 )),
383 uuid,
384 )
385 }
386 }
387 }
388
389 pub(crate) fn end_statement_execution(
395 &mut self,
396 id: StatementLoggingId,
397 reason: StatementEndedExecutionReason,
398 ) {
399 let StatementLoggingId(uuid) = id;
400 let now = self.now();
401 let ended_record = StatementEndedExecutionRecord {
402 id: uuid,
403 reason,
404 ended_at: now,
405 };
406
407 let began_record = self
408 .statement_logging
409 .executions_begun
410 .remove(&uuid)
411 .expect(
412 "matched `begin_statement_execution` and `end_statement_execution` invocations",
413 );
414 for (row, diff) in
415 Self::pack_statement_ended_execution_updates(&began_record, &ended_record)
416 {
417 self.statement_logging
418 .pending_statement_execution_events
419 .push((row, diff));
420 }
421 self.record_statement_lifecycle_event(
422 &id,
423 &StatementLifecycleEvent::ExecutionFinished,
424 now,
425 );
426 }
427
428 fn pack_session_history_update(event: &SessionHistoryEvent) -> Row {
429 let SessionHistoryEvent {
430 id,
431 connected_at,
432 application_name,
433 authenticated_user,
434 } = event;
435 Row::pack_slice(&[
436 Datum::Uuid(*id),
437 Datum::TimestampTz(to_datetime(*connected_at).try_into().expect("must fit")),
438 Datum::String(&*application_name),
439 Datum::String(&*authenticated_user),
440 ])
441 }
442
443 fn pack_statement_lifecycle_event(
444 StatementLoggingId(uuid): &StatementLoggingId,
445 event: &StatementLifecycleEvent,
446 when: EpochMillis,
447 ) -> Row {
448 Row::pack_slice(&[
449 Datum::Uuid(*uuid),
450 Datum::String(event.as_str()),
451 Datum::TimestampTz(to_datetime(when).try_into().expect("must fit")),
452 ])
453 }
454
455 fn pack_full_statement_execution_update(
456 began_record: &StatementBeganExecutionRecord,
457 ended_record: &StatementEndedExecutionRecord,
458 ) -> Row {
459 let mut row = Row::default();
460 let mut packer = row.packer();
461 pack_statement_execution_inner(began_record, &mut packer);
462 let (status, error_message, result_size, rows_returned, execution_strategy) =
463 match &ended_record.reason {
464 StatementEndedExecutionReason::Success {
465 result_size,
466 rows_returned,
467 execution_strategy,
468 } => (
469 "success",
470 None,
471 result_size.map(|rs| i64::try_from(rs).expect("must fit")),
472 rows_returned.map(|rr| i64::try_from(rr).expect("must fit")),
473 execution_strategy.map(|es| es.name()),
474 ),
475 StatementEndedExecutionReason::Canceled => ("canceled", None, None, None, None),
476 StatementEndedExecutionReason::Errored { error } => {
477 ("error", Some(error.as_str()), None, None, None)
478 }
479 StatementEndedExecutionReason::Aborted => ("aborted", None, None, None, None),
480 };
481 packer.extend([
482 Datum::TimestampTz(
483 to_datetime(ended_record.ended_at)
484 .try_into()
485 .expect("Sane system time"),
486 ),
487 status.into(),
488 error_message.into(),
489 result_size.into(),
490 rows_returned.into(),
491 execution_strategy.into(),
492 ]);
493 row
494 }
495
496 fn pack_statement_ended_execution_updates(
497 began_record: &StatementBeganExecutionRecord,
498 ended_record: &StatementEndedExecutionRecord,
499 ) -> [(Row, Diff); 2] {
500 let retraction = pack_statement_began_execution_update(began_record);
501 let new = Self::pack_full_statement_execution_update(began_record, ended_record);
502 [(retraction, Diff::MINUS_ONE), (new, Diff::ONE)]
503 }
504
505 fn mutate_record<F: FnOnce(&mut StatementBeganExecutionRecord)>(
507 &mut self,
508 StatementLoggingId(id): StatementLoggingId,
509 f: F,
510 ) {
511 let record = self
512 .statement_logging
513 .executions_begun
514 .get_mut(&id)
515 .expect("mutate_record must not be called after execution ends");
516 let retraction = pack_statement_began_execution_update(record);
517 self.statement_logging
518 .pending_statement_execution_events
519 .push((retraction, Diff::MINUS_ONE));
520 f(record);
521 let update = pack_statement_began_execution_update(record);
522 self.statement_logging
523 .pending_statement_execution_events
524 .push((update, Diff::ONE));
525 }
526
527 pub(crate) fn set_statement_execution_cluster(
532 &mut self,
533 id: StatementLoggingId,
534 cluster_id: ClusterId,
535 ) {
536 let cluster_name = self.catalog().get_cluster(cluster_id).name.clone();
537 self.mutate_record(id, |record| {
538 record.cluster_name = Some(cluster_name);
539 record.cluster_id = Some(cluster_id);
540 });
541 }
542
543 pub(crate) fn set_statement_execution_timestamp(
545 &mut self,
546 id: StatementLoggingId,
547 timestamp: Timestamp,
548 ) {
549 self.mutate_record(id, |record| {
550 record.execution_timestamp = Some(u64::from(timestamp));
551 });
552 }
553
554 pub(crate) fn set_transient_index_id(
555 &mut self,
556 id: StatementLoggingId,
557 transient_index_id: GlobalId,
558 ) {
559 self.mutate_record(id, |record| {
560 record.transient_index_id = Some(transient_index_id)
561 });
562 }
563
564 pub(crate) fn begin_statement_execution(
571 &mut self,
572 session: &mut Session,
573 params: &Params,
574 logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
575 lifecycle_timestamps: Option<LifecycleTimestamps>,
576 ) -> Option<StatementLoggingId> {
577 let enable_internal_statement_logging = self
578 .catalog()
579 .system_config()
580 .enable_internal_statement_logging();
581 if session.user().is_internal() && !enable_internal_statement_logging {
582 return None;
583 }
584
585 let sample_rate = effective_sample_rate(session, self.catalog().system_config());
586 let use_reproducible_rng = self
587 .catalog()
588 .system_config()
589 .statement_logging_use_reproducible_rng();
590 let sample = if use_reproducible_rng {
592 let mut rng = self
593 .statement_logging
594 .reproducible_rng
595 .lock()
596 .expect("rng lock poisoned");
597 should_sample_statement(sample_rate, Some(&mut *rng))
598 } else {
599 should_sample_statement(sample_rate, None)
600 };
601
602 let sampled_label = sample.then_some("true").unwrap_or("false");
606 self.metrics
607 .statement_logging_records
608 .with_label_values(&[sampled_label])
609 .inc_by(1);
610
611 if let Some((sql, accounted)) = match session.qcell_rw(logging) {
612 PreparedStatementLoggingInfo::AlreadyLogged { .. } => None,
613 PreparedStatementLoggingInfo::StillToLog { sql, accounted, .. } => {
614 Some((sql, accounted))
615 }
616 } {
617 if !*accounted {
618 self.metrics
619 .statement_logging_unsampled_bytes
620 .inc_by(u64::cast_from(sql.len()));
621 if sample {
622 self.metrics
623 .statement_logging_actual_bytes
624 .inc_by(u64::cast_from(sql.len()));
625 }
626 *accounted = true;
627 }
628 }
629 if !sample {
630 return None;
631 }
632
633 let (maybe_ps, ps_uuid) = self.get_prepared_statement_info(session, logging);
634
635 let began_at = if let Some(lifecycle_timestamps) = lifecycle_timestamps {
636 lifecycle_timestamps.received
637 } else {
638 self.now()
639 };
640 let now = self.now();
641 let execution_uuid = epoch_to_uuid_v7(&now);
642
643 let build_info_version = self
644 .catalog()
645 .state()
646 .config()
647 .build_info
648 .human_version(None);
649 let record = create_began_execution_record(
650 execution_uuid,
651 ps_uuid,
652 sample_rate,
653 params,
654 session,
655 began_at,
656 build_info_version,
657 );
658
659 let mseh_update = pack_statement_began_execution_update(&record);
661
662 let (maybe_ps_event, maybe_sh_event) = if let Some((ps_record, ps_event)) = maybe_ps {
663 if let Some(sh) = self
664 .statement_logging
665 .unlogged_sessions
666 .get(&ps_record.session_id)
667 {
668 (
669 Some(ps_event),
670 Some((Self::pack_session_history_update(sh), ps_record.session_id)),
671 )
672 } else {
673 (Some(ps_event), None)
674 }
675 } else {
676 (None, None)
677 };
678
679 let maybe_ps_prepared_statement = maybe_ps_event.as_ref().map(|e| &e.prepared_statement);
680 let maybe_ps_sql_text = maybe_ps_event.as_ref().map(|e| &e.sql_text);
681
682 if !self.statement_logging_throttling_check([
683 Some(&mseh_update),
684 maybe_ps_prepared_statement,
685 maybe_ps_sql_text,
686 maybe_sh_event.as_ref().map(|(row, _)| row),
687 ]) {
688 self.statement_logging
690 .throttling_state
691 .increment_throttled_count();
692 return None;
693 }
694 else if let PreparedStatementLoggingInfo::StillToLog { .. } = session.qcell_ro(logging) {
699 self.statement_logging
700 .throttling_state
701 .reset_throttled_count();
702 }
703
704 self.record_prepared_statement_as_logged(ps_uuid, session, logging);
705
706 self.record_statement_lifecycle_event(
707 &StatementLoggingId(execution_uuid),
708 &StatementLifecycleEvent::ExecutionBegan,
709 began_at,
710 );
711
712 self.statement_logging
713 .pending_statement_execution_events
714 .push((mseh_update, Diff::ONE));
715 self.statement_logging
716 .executions_begun
717 .insert(execution_uuid, record);
718
719 if let Some((sh_update, session_id)) = maybe_sh_event {
720 self.statement_logging
721 .pending_session_events
722 .push(sh_update);
723 self.statement_logging.unlogged_sessions.remove(&session_id);
725 }
726 if let Some(ps_event) = maybe_ps_event {
727 self.statement_logging
728 .pending_prepared_statement_events
729 .push(ps_event);
730 }
731
732 Some(StatementLoggingId(execution_uuid))
733 }
734
735 pub(crate) fn begin_session_for_statement_logging(&mut self, session: &ConnMeta) {
737 let id = session.uuid();
738 let session_role = session.authenticated_role_id();
739 let event = SessionHistoryEvent {
740 id,
741 connected_at: session.connected_at(),
742 application_name: session.application_name().to_owned(),
743 authenticated_user: self.catalog.get_role(session_role).name.clone(),
744 };
745 self.statement_logging.unlogged_sessions.insert(id, event);
746 }
747
748 pub(crate) fn end_session_for_statement_logging(&mut self, uuid: Uuid) {
749 self.statement_logging.unlogged_sessions.remove(&uuid);
750 }
751
752 pub(crate) fn record_statement_lifecycle_event(
753 &mut self,
754 id: &StatementLoggingId,
755 event: &StatementLifecycleEvent,
756 when: EpochMillis,
757 ) {
758 if mz_adapter_types::dyncfgs::ENABLE_STATEMENT_LIFECYCLE_LOGGING
759 .get(self.catalog().system_config().dyncfgs())
760 {
761 let row = Self::pack_statement_lifecycle_event(id, event, when);
762 self.statement_logging
763 .pending_statement_lifecycle_events
764 .push(row);
765 }
766 }
767
768 pub(crate) fn install_peek_watch_sets(
775 &mut self,
776 conn_id: ConnectionId,
777 watch_set: WatchSetCreation,
778 ) -> Result<(), CollectionLookupError> {
779 let WatchSetCreation {
780 logging_id,
781 timestamp,
782 storage_ids,
783 compute_ids,
784 } = watch_set;
785
786 self.install_storage_watch_set(
787 conn_id.clone(),
788 storage_ids,
789 timestamp,
790 WatchSetResponse::StatementDependenciesReady(
791 logging_id,
792 StatementLifecycleEvent::StorageDependenciesFinished,
793 ),
794 )?;
795 self.install_compute_watch_set(
796 conn_id,
797 compute_ids,
798 timestamp,
799 WatchSetResponse::StatementDependenciesReady(
800 logging_id,
801 StatementLifecycleEvent::ComputeDependenciesFinished,
802 ),
803 )?;
804 Ok(())
805 }
806}