1use std::collections::BTreeMap;
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13
14use mz_adapter_types::connection::ConnectionId;
15use mz_compute_client::controller::error::CollectionLookupError;
16use mz_controller_types::ClusterId;
17use mz_ore::now::{EpochMillis, NowFn, epoch_to_uuid_v7, to_datetime};
18use mz_ore::task::spawn;
19use mz_ore::{cast::CastFrom, cast::CastInto, soft_panic_or_log};
20use mz_repr::adt::timestamp::TimestampLike;
21use mz_repr::{Datum, Diff, GlobalId, Row, Timestamp};
22use mz_sql::plan::Params;
23use mz_sql::session::metadata::SessionMetadata;
24use mz_storage_client::controller::IntrospectionType;
25use qcell::QCell;
26use rand::SeedableRng;
27use sha2::{Digest, Sha256};
28use tokio::time::MissedTickBehavior;
29use uuid::Uuid;
30
31use crate::coord::{ConnMeta, Coordinator, WatchSetResponse};
32use crate::session::{LifecycleTimestamps, Session};
33use crate::statement_logging::{
34 FrontendStatementLoggingEvent, PreparedStatementEvent, PreparedStatementLoggingInfo,
35 SessionHistoryEvent, StatementBeganExecutionRecord, StatementEndedExecutionReason,
36 StatementEndedExecutionRecord, StatementLifecycleEvent, StatementLoggingFrontend,
37 StatementLoggingId, StatementPreparedRecord, ThrottlingState, WatchSetCreation,
38 create_began_execution_record, effective_sample_rate, pack_statement_began_execution_update,
39 pack_statement_execution_inner, pack_statement_prepared_update, should_sample_statement,
40};
41
42use super::Message;
43
44#[derive(Debug)]
46pub(crate) struct StatementLogging {
47 executions_begun: BTreeMap<Uuid, StatementBeganExecutionRecord>,
54
55 unlogged_sessions: BTreeMap<Uuid, SessionHistoryEvent>,
59
60 reproducible_rng: Arc<Mutex<rand_chacha::ChaCha8Rng>>,
65
66 pending_statement_execution_events: Vec<(Row, Diff)>,
68 pending_prepared_statement_events: Vec<PreparedStatementEvent>,
69 pending_session_events: Vec<Row>,
70 pending_statement_lifecycle_events: Vec<Row>,
71
72 pub(crate) throttling_state: Arc<ThrottlingState>,
74
75 pub(crate) now: NowFn,
77}
78
79impl StatementLogging {
80 const REPRODUCIBLE_RNG_SEED: u64 = 42;
81
82 pub(crate) fn new(now: NowFn) -> Self {
83 Self {
84 executions_begun: BTreeMap::new(),
85 unlogged_sessions: BTreeMap::new(),
86 reproducible_rng: Arc::new(Mutex::new(rand_chacha::ChaCha8Rng::seed_from_u64(
87 Self::REPRODUCIBLE_RNG_SEED,
88 ))),
89 pending_statement_execution_events: Vec::new(),
90 pending_prepared_statement_events: Vec::new(),
91 pending_session_events: Vec::new(),
92 pending_statement_lifecycle_events: Vec::new(),
93 throttling_state: Arc::new(ThrottlingState::new(&now)),
94 now,
95 }
96 }
97
98 pub(crate) fn create_frontend(
103 &self,
104 build_info_human_version: String,
105 ) -> StatementLoggingFrontend {
106 StatementLoggingFrontend {
107 throttling_state: Arc::clone(&self.throttling_state),
108 reproducible_rng: Arc::clone(&self.reproducible_rng),
109 build_info_human_version,
110 now: self.now.clone(),
111 }
112 }
113}
114
115impl Coordinator {
116 fn write_began_execution_events(
119 &mut self,
120 record: StatementBeganExecutionRecord,
121 mseh_update: Row,
122 prepared_statement: Option<PreparedStatementEvent>,
123 ) {
124 self.statement_logging
126 .pending_statement_execution_events
127 .push((mseh_update, Diff::ONE));
128
129 self.statement_logging
131 .executions_begun
132 .insert(record.id, record);
133
134 if let Some(ps_event) = prepared_statement {
136 let session_id = ps_event.session_id;
137 self.statement_logging
138 .pending_prepared_statement_events
139 .push(ps_event);
140
141 if let Some(sh) = self.statement_logging.unlogged_sessions.remove(&session_id) {
143 let sh_update = Self::pack_session_history_update(&sh);
144 self.statement_logging
145 .pending_session_events
146 .push(sh_update);
147 }
148 }
149 }
150
151 pub(crate) fn handle_frontend_statement_logging_event(
153 &mut self,
154 event: FrontendStatementLoggingEvent,
155 ) {
156 match event {
157 FrontendStatementLoggingEvent::BeganExecution {
158 record,
159 mseh_update,
160 prepared_statement,
161 } => {
162 self.record_statement_lifecycle_event(
163 &StatementLoggingId(record.id),
164 &StatementLifecycleEvent::ExecutionBegan,
165 record.began_at,
166 );
167 self.write_began_execution_events(record, mseh_update, prepared_statement);
168 }
169 FrontendStatementLoggingEvent::EndedExecution(ended_record) => {
170 self.end_statement_execution(
171 StatementLoggingId(ended_record.id),
172 ended_record.reason,
173 );
174 }
175 FrontendStatementLoggingEvent::SetCluster {
176 id,
177 cluster_id,
178 cluster_name,
179 } => {
180 self.set_statement_execution_cluster(id, cluster_id, cluster_name);
181 }
182 FrontendStatementLoggingEvent::SetTimestamp { id, timestamp } => {
183 self.set_statement_execution_timestamp(id, timestamp);
184 }
185 FrontendStatementLoggingEvent::SetTransientIndex {
186 id,
187 transient_index_id,
188 } => {
189 self.set_transient_index_id(id, transient_index_id);
190 }
191 FrontendStatementLoggingEvent::Lifecycle { id, event, when } => {
192 self.record_statement_lifecycle_event(&id, &event, when);
193 }
194 }
195 }
196
197 const STATEMENT_LOGGING_WRITE_INTERVAL: Duration = Duration::from_secs(5);
202
203 pub(crate) fn spawn_statement_logging_task(&self) {
204 let internal_cmd_tx = self.internal_cmd_tx.clone();
205 spawn(|| "statement_logging", async move {
206 let mut interval = tokio::time::interval(Coordinator::STATEMENT_LOGGING_WRITE_INTERVAL);
207 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
208 loop {
209 interval.tick().await;
210 let _ = internal_cmd_tx.send(Message::DrainStatementLog);
211 }
212 });
213 }
214
215 #[mz_ore::instrument(level = "debug")]
216 pub(crate) fn drain_statement_log(&mut self) {
217 let session_updates = std::mem::take(&mut self.statement_logging.pending_session_events)
218 .into_iter()
219 .map(|update| (update, Diff::ONE))
220 .collect();
221 let (prepared_statement_updates, sql_text_updates) =
222 std::mem::take(&mut self.statement_logging.pending_prepared_statement_events)
223 .into_iter()
224 .map(
225 |PreparedStatementEvent {
226 prepared_statement,
227 sql_text,
228 ..
229 }| {
230 ((prepared_statement, Diff::ONE), (sql_text, Diff::ONE))
231 },
232 )
233 .unzip::<_, _, Vec<_>, Vec<_>>();
234 let statement_execution_updates =
235 std::mem::take(&mut self.statement_logging.pending_statement_execution_events);
236 let statement_lifecycle_updates =
237 std::mem::take(&mut self.statement_logging.pending_statement_lifecycle_events)
238 .into_iter()
239 .map(|update| (update, Diff::ONE))
240 .collect();
241
242 use IntrospectionType::*;
243 for (type_, updates) in [
244 (SessionHistory, session_updates),
245 (PreparedStatementHistory, prepared_statement_updates),
246 (StatementExecutionHistory, statement_execution_updates),
247 (StatementLifecycleHistory, statement_lifecycle_updates),
248 (SqlText, sql_text_updates),
249 ] {
250 if !updates.is_empty() && !self.controller.read_only() {
251 self.controller
252 .storage
253 .append_introspection_updates(type_, updates);
254 }
255 }
256 }
257
258 fn statement_logging_throttling_check<'a, I>(&self, rows: I) -> bool
266 where
267 I: IntoIterator<Item = Option<&'a Row>>,
268 {
269 let cost = rows
270 .into_iter()
271 .filter_map(|row_opt| row_opt.map(|row| row.byte_len()))
272 .fold(0_usize, |acc, x| acc.saturating_add(x));
273
274 let Some(target_data_rate) = self
275 .catalog
276 .system_config()
277 .statement_logging_target_data_rate()
278 else {
279 return true;
280 };
281 let max_data_credit = self
282 .catalog
283 .system_config()
284 .statement_logging_max_data_credit();
285
286 self.statement_logging.throttling_state.throttling_check(
287 cost.cast_into(),
288 target_data_rate.cast_into(),
289 max_data_credit.map(CastInto::cast_into),
290 &self.statement_logging.now,
291 )
292 }
293
294 fn record_prepared_statement_as_logged(
297 &self,
298 uuid: Uuid,
299 session: &mut Session,
300 logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
301 ) {
302 let logging = session.qcell_rw(&*logging);
303 if let PreparedStatementLoggingInfo::StillToLog { .. } = logging {
304 *logging = PreparedStatementLoggingInfo::AlreadyLogged { uuid };
305 }
306 }
307
308 pub(crate) fn get_prepared_statement_info(
320 &self,
321 session: &Session,
322 logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
323 ) -> (
324 Option<(StatementPreparedRecord, PreparedStatementEvent)>,
325 Uuid,
326 ) {
327 let logging = session.qcell_ro(&*logging);
328
329 match logging {
330 PreparedStatementLoggingInfo::AlreadyLogged { uuid } => (None, *uuid),
331 PreparedStatementLoggingInfo::StillToLog {
332 sql,
333 redacted_sql,
334 prepared_at,
335 name,
336 session_id,
337 accounted,
338 kind,
339 _sealed: _,
340 } => {
341 assert!(
342 *accounted,
343 "accounting for logging should be done in `begin_statement_execution`"
344 );
345 let uuid = epoch_to_uuid_v7(prepared_at);
346 let sql_hash: [u8; 32] = Sha256::digest(sql.as_bytes()).into();
347 let record = StatementPreparedRecord {
348 id: uuid,
349 sql_hash,
350 name: name.to_string(),
351 session_id: *session_id,
352 prepared_at: *prepared_at,
353 kind: *kind,
354 };
355
356 let mut mpsh_row = Row::default();
358 let mut mpsh_packer = mpsh_row.packer();
359 pack_statement_prepared_update(&record, &mut mpsh_packer);
360 let throttled_count = self
361 .statement_logging
362 .throttling_state
363 .get_throttled_count();
364 mpsh_packer.push(Datum::UInt64(CastFrom::cast_from(throttled_count)));
365
366 let sql_row = Row::pack([
367 Datum::TimestampTz(
368 to_datetime(*prepared_at)
369 .truncate_day()
370 .try_into()
371 .expect("must fit"),
372 ),
373 Datum::Bytes(sql_hash.as_slice()),
374 Datum::String(sql.as_str()),
375 Datum::String(redacted_sql.as_str()),
376 ]);
377
378 (
379 Some((
380 record,
381 PreparedStatementEvent {
382 prepared_statement: mpsh_row,
383 sql_text: sql_row,
384 session_id: *session_id,
385 },
386 )),
387 uuid,
388 )
389 }
390 }
391 }
392
393 pub(crate) fn end_statement_execution(
402 &mut self,
403 id: StatementLoggingId,
404 reason: StatementEndedExecutionReason,
405 ) {
406 let StatementLoggingId(uuid) = id;
407 let now = self.now();
408 let ended_record = StatementEndedExecutionRecord {
409 id: uuid,
410 reason,
411 ended_at: now,
412 };
413
414 let Some(began_record) = self.statement_logging.executions_begun.remove(&uuid) else {
415 soft_panic_or_log!(
420 "duplicate end_statement_execution for statement {uuid}, reason: {:?}",
421 ended_record.reason
422 );
423 return;
424 };
425 for (row, diff) in
426 Self::pack_statement_ended_execution_updates(&began_record, &ended_record)
427 {
428 self.statement_logging
429 .pending_statement_execution_events
430 .push((row, diff));
431 }
432 self.record_statement_lifecycle_event(
433 &id,
434 &StatementLifecycleEvent::ExecutionFinished,
435 now,
436 );
437 }
438
439 fn pack_session_history_update(event: &SessionHistoryEvent) -> Row {
440 let SessionHistoryEvent {
441 id,
442 connected_at,
443 application_name,
444 authenticated_user,
445 } = event;
446 Row::pack_slice(&[
447 Datum::Uuid(*id),
448 Datum::TimestampTz(to_datetime(*connected_at).try_into().expect("must fit")),
449 Datum::String(&*application_name),
450 Datum::String(&*authenticated_user),
451 ])
452 }
453
454 fn pack_statement_lifecycle_event(
455 StatementLoggingId(uuid): &StatementLoggingId,
456 event: &StatementLifecycleEvent,
457 when: EpochMillis,
458 ) -> Row {
459 Row::pack_slice(&[
460 Datum::Uuid(*uuid),
461 Datum::String(event.as_str()),
462 Datum::TimestampTz(to_datetime(when).try_into().expect("must fit")),
463 ])
464 }
465
466 fn pack_full_statement_execution_update(
467 began_record: &StatementBeganExecutionRecord,
468 ended_record: &StatementEndedExecutionRecord,
469 ) -> Row {
470 let mut row = Row::default();
471 let mut packer = row.packer();
472 pack_statement_execution_inner(began_record, &mut packer);
473 let (status, error_message, result_size, rows_returned, execution_strategy) =
474 match &ended_record.reason {
475 StatementEndedExecutionReason::Success {
476 result_size,
477 rows_returned,
478 execution_strategy,
479 } => (
480 "success",
481 None,
482 result_size.map(|rs| i64::try_from(rs).expect("must fit")),
483 rows_returned.map(|rr| i64::try_from(rr).expect("must fit")),
484 execution_strategy.map(|es| es.name()),
485 ),
486 StatementEndedExecutionReason::Canceled => ("canceled", None, None, None, None),
487 StatementEndedExecutionReason::Errored { error } => {
488 ("error", Some(error.as_str()), None, None, None)
489 }
490 StatementEndedExecutionReason::Aborted => ("aborted", None, None, None, None),
491 };
492 packer.extend([
493 Datum::TimestampTz(
494 to_datetime(ended_record.ended_at)
495 .try_into()
496 .expect("Sane system time"),
497 ),
498 status.into(),
499 error_message.into(),
500 result_size.into(),
501 rows_returned.into(),
502 execution_strategy.into(),
503 ]);
504 row
505 }
506
507 fn pack_statement_ended_execution_updates(
508 began_record: &StatementBeganExecutionRecord,
509 ended_record: &StatementEndedExecutionRecord,
510 ) -> [(Row, Diff); 2] {
511 let retraction = pack_statement_began_execution_update(began_record);
512 let new = Self::pack_full_statement_execution_update(began_record, ended_record);
513 [(retraction, Diff::MINUS_ONE), (new, Diff::ONE)]
514 }
515
516 fn mutate_record<F: FnOnce(&mut StatementBeganExecutionRecord)>(
518 &mut self,
519 StatementLoggingId(id): StatementLoggingId,
520 f: F,
521 ) {
522 let record = self
523 .statement_logging
524 .executions_begun
525 .get_mut(&id)
526 .expect("mutate_record must not be called after execution ends");
527 let retraction = pack_statement_began_execution_update(record);
528 self.statement_logging
529 .pending_statement_execution_events
530 .push((retraction, Diff::MINUS_ONE));
531 f(record);
532 let update = pack_statement_began_execution_update(record);
533 self.statement_logging
534 .pending_statement_execution_events
535 .push((update, Diff::ONE));
536 }
537
538 pub(crate) fn set_statement_execution_cluster(
546 &mut self,
547 id: StatementLoggingId,
548 cluster_id: ClusterId,
549 cluster_name: String,
550 ) {
551 self.mutate_record(id, |record| {
552 record.cluster_name = Some(cluster_name);
553 record.cluster_id = Some(cluster_id);
554 });
555 }
556
557 pub(crate) fn set_statement_execution_timestamp(
559 &mut self,
560 id: StatementLoggingId,
561 timestamp: Timestamp,
562 ) {
563 self.mutate_record(id, |record| {
564 record.execution_timestamp = Some(u64::from(timestamp));
565 });
566 }
567
568 pub(crate) fn set_transient_index_id(
569 &mut self,
570 id: StatementLoggingId,
571 transient_index_id: GlobalId,
572 ) {
573 self.mutate_record(id, |record| {
574 record.transient_index_id = Some(transient_index_id)
575 });
576 }
577
578 pub(crate) fn begin_statement_execution(
585 &mut self,
586 session: &mut Session,
587 params: &Params,
588 logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
589 lifecycle_timestamps: Option<LifecycleTimestamps>,
590 ) -> Option<StatementLoggingId> {
591 let enable_internal_statement_logging = self
592 .catalog()
593 .system_config()
594 .enable_internal_statement_logging();
595 if session.user().is_internal() && !enable_internal_statement_logging {
596 return None;
597 }
598
599 let sample_rate = effective_sample_rate(session, self.catalog().system_config());
600 let use_reproducible_rng = self
601 .catalog()
602 .system_config()
603 .statement_logging_use_reproducible_rng();
604 let sample = if use_reproducible_rng {
606 let mut rng = self
607 .statement_logging
608 .reproducible_rng
609 .lock()
610 .expect("rng lock poisoned");
611 should_sample_statement(sample_rate, Some(&mut *rng))
612 } else {
613 should_sample_statement(sample_rate, None)
614 };
615
616 let sampled_label = sample.then_some("true").unwrap_or("false");
620 self.metrics
621 .statement_logging_records
622 .with_label_values(&[sampled_label])
623 .inc_by(1);
624
625 if let Some((sql, accounted)) = match session.qcell_rw(logging) {
626 PreparedStatementLoggingInfo::AlreadyLogged { .. } => None,
627 PreparedStatementLoggingInfo::StillToLog { sql, accounted, .. } => {
628 Some((sql, accounted))
629 }
630 } {
631 if !*accounted {
632 self.metrics
633 .statement_logging_unsampled_bytes
634 .inc_by(u64::cast_from(sql.len()));
635 if sample {
636 self.metrics
637 .statement_logging_actual_bytes
638 .inc_by(u64::cast_from(sql.len()));
639 }
640 *accounted = true;
641 }
642 }
643 if !sample {
644 return None;
645 }
646
647 let (maybe_ps, ps_uuid) = self.get_prepared_statement_info(session, logging);
648
649 let began_at = if let Some(lifecycle_timestamps) = lifecycle_timestamps {
650 lifecycle_timestamps.received
651 } else {
652 self.now()
653 };
654 let now = self.now();
655 let execution_uuid = epoch_to_uuid_v7(&now);
656
657 let build_info_version = self
658 .catalog()
659 .state()
660 .config()
661 .build_info
662 .human_version(None);
663 let record = create_began_execution_record(
664 execution_uuid,
665 ps_uuid,
666 sample_rate,
667 params,
668 session,
669 began_at,
670 build_info_version,
671 );
672
673 let mseh_update = pack_statement_began_execution_update(&record);
675
676 let (maybe_ps_event, maybe_sh_event) = if let Some((ps_record, ps_event)) = maybe_ps {
677 if let Some(sh) = self
678 .statement_logging
679 .unlogged_sessions
680 .get(&ps_record.session_id)
681 {
682 (
683 Some(ps_event),
684 Some((Self::pack_session_history_update(sh), ps_record.session_id)),
685 )
686 } else {
687 (Some(ps_event), None)
688 }
689 } else {
690 (None, None)
691 };
692
693 let maybe_ps_prepared_statement = maybe_ps_event.as_ref().map(|e| &e.prepared_statement);
694 let maybe_ps_sql_text = maybe_ps_event.as_ref().map(|e| &e.sql_text);
695
696 if !self.statement_logging_throttling_check([
697 Some(&mseh_update),
698 maybe_ps_prepared_statement,
699 maybe_ps_sql_text,
700 maybe_sh_event.as_ref().map(|(row, _)| row),
701 ]) {
702 self.statement_logging
704 .throttling_state
705 .increment_throttled_count();
706 return None;
707 }
708 else if let PreparedStatementLoggingInfo::StillToLog { .. } = session.qcell_ro(logging) {
713 self.statement_logging
714 .throttling_state
715 .reset_throttled_count();
716 }
717
718 self.record_prepared_statement_as_logged(ps_uuid, session, logging);
719
720 self.record_statement_lifecycle_event(
721 &StatementLoggingId(execution_uuid),
722 &StatementLifecycleEvent::ExecutionBegan,
723 began_at,
724 );
725
726 self.statement_logging
727 .pending_statement_execution_events
728 .push((mseh_update, Diff::ONE));
729 self.statement_logging
730 .executions_begun
731 .insert(execution_uuid, record);
732
733 if let Some((sh_update, session_id)) = maybe_sh_event {
734 self.statement_logging
735 .pending_session_events
736 .push(sh_update);
737 self.statement_logging.unlogged_sessions.remove(&session_id);
739 }
740 if let Some(ps_event) = maybe_ps_event {
741 self.statement_logging
742 .pending_prepared_statement_events
743 .push(ps_event);
744 }
745
746 Some(StatementLoggingId(execution_uuid))
747 }
748
749 pub(crate) fn begin_session_for_statement_logging(&mut self, session: &ConnMeta) {
751 let id = session.uuid();
752 let session_role = session.authenticated_role_id();
753 let event = SessionHistoryEvent {
754 id,
755 connected_at: session.connected_at(),
756 application_name: session.application_name().to_owned(),
757 authenticated_user: self.catalog.get_role(session_role).name.clone(),
758 };
759 self.statement_logging.unlogged_sessions.insert(id, event);
760 }
761
762 pub(crate) fn end_session_for_statement_logging(&mut self, uuid: Uuid) {
763 self.statement_logging.unlogged_sessions.remove(&uuid);
764 }
765
766 pub(crate) fn record_statement_lifecycle_event(
767 &mut self,
768 id: &StatementLoggingId,
769 event: &StatementLifecycleEvent,
770 when: EpochMillis,
771 ) {
772 if mz_adapter_types::dyncfgs::ENABLE_STATEMENT_LIFECYCLE_LOGGING
773 .get(self.catalog().system_config().dyncfgs())
774 {
775 let row = Self::pack_statement_lifecycle_event(id, event, when);
776 self.statement_logging
777 .pending_statement_lifecycle_events
778 .push(row);
779 }
780 }
781
782 pub(crate) fn install_peek_watch_sets(
789 &mut self,
790 conn_id: ConnectionId,
791 watch_set: WatchSetCreation,
792 ) -> Result<(), CollectionLookupError> {
793 let WatchSetCreation {
794 logging_id,
795 timestamp,
796 storage_ids,
797 compute_ids,
798 } = watch_set;
799
800 self.install_storage_watch_set(
801 conn_id.clone(),
802 storage_ids,
803 timestamp,
804 WatchSetResponse::StatementDependenciesReady(
805 logging_id,
806 StatementLifecycleEvent::StorageDependenciesFinished,
807 ),
808 )?;
809 self.install_compute_watch_set(
810 conn_id,
811 compute_ids,
812 timestamp,
813 WatchSetResponse::StatementDependenciesReady(
814 logging_id,
815 StatementLifecycleEvent::ComputeDependenciesFinished,
816 ),
817 )?;
818 Ok(())
819 }
820}