Skip to main content

mz_adapter/coord/
statement_logging.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13
14use mz_adapter_types::connection::ConnectionId;
15use mz_compute_client::controller::error::CollectionLookupError;
16use mz_controller_types::ClusterId;
17use mz_ore::now::{EpochMillis, NowFn, epoch_to_uuid_v7, to_datetime};
18use mz_ore::task::spawn;
19use mz_ore::{cast::CastFrom, cast::CastInto, soft_panic_or_log};
20use mz_repr::adt::timestamp::TimestampLike;
21use mz_repr::{Datum, Diff, GlobalId, Row, Timestamp};
22use mz_sql::plan::Params;
23use mz_sql::session::metadata::SessionMetadata;
24use mz_storage_client::controller::IntrospectionType;
25use qcell::QCell;
26use rand::SeedableRng;
27use sha2::{Digest, Sha256};
28use tokio::time::MissedTickBehavior;
29use uuid::Uuid;
30
31use crate::coord::{ConnMeta, Coordinator, WatchSetResponse};
32use crate::session::{LifecycleTimestamps, Session};
33use crate::statement_logging::{
34    FrontendStatementLoggingEvent, PreparedStatementEvent, PreparedStatementLoggingInfo,
35    SessionHistoryEvent, StatementBeganExecutionRecord, StatementEndedExecutionReason,
36    StatementEndedExecutionRecord, StatementLifecycleEvent, StatementLoggingFrontend,
37    StatementLoggingId, StatementPreparedRecord, ThrottlingState, WatchSetCreation,
38    create_began_execution_record, effective_sample_rate, pack_statement_began_execution_update,
39    pack_statement_execution_inner, pack_statement_prepared_update, should_sample_statement,
40};
41
42use super::Message;
43
44/// Statement logging state in the Coordinator.
45#[derive(Debug)]
46pub(crate) struct StatementLogging {
47    /// Information about statement executions that have been logged
48    /// but not finished.
49    ///
50    /// This map needs to have enough state left over to later retract
51    /// the system table entries (so that we can update them when the
52    /// execution finished.)
53    executions_begun: BTreeMap<Uuid, StatementBeganExecutionRecord>,
54
55    /// Information about sessions that have been started, but which
56    /// have not yet been logged in `mz_session_history`.
57    /// They may be logged as part of a statement being executed (and chosen for logging).
58    unlogged_sessions: BTreeMap<Uuid, SessionHistoryEvent>,
59
60    /// A reproducible RNG for deciding whether to sample statement executions.
61    /// Only used by tests; otherwise, `rand::rng()` is used.
62    /// Controlled by the system var `statement_logging_use_reproducible_rng`.
63    /// This same instance will be used by all frontend tasks.
64    reproducible_rng: Arc<Mutex<rand_chacha::ChaCha8Rng>>,
65
66    /// Events to be persisted periodically.
67    pending_statement_execution_events: Vec<(Row, Diff)>,
68    pending_prepared_statement_events: Vec<PreparedStatementEvent>,
69    pending_session_events: Vec<Row>,
70    pending_statement_lifecycle_events: Vec<Row>,
71
72    /// Shared throttling state for rate-limiting statement logging.
73    pub(crate) throttling_state: Arc<ThrottlingState>,
74
75    /// Function to get the current time.
76    pub(crate) now: NowFn,
77}
78
79impl StatementLogging {
80    const REPRODUCIBLE_RNG_SEED: u64 = 42;
81
82    pub(crate) fn new(now: NowFn) -> Self {
83        Self {
84            executions_begun: BTreeMap::new(),
85            unlogged_sessions: BTreeMap::new(),
86            reproducible_rng: Arc::new(Mutex::new(rand_chacha::ChaCha8Rng::seed_from_u64(
87                Self::REPRODUCIBLE_RNG_SEED,
88            ))),
89            pending_statement_execution_events: Vec::new(),
90            pending_prepared_statement_events: Vec::new(),
91            pending_session_events: Vec::new(),
92            pending_statement_lifecycle_events: Vec::new(),
93            throttling_state: Arc::new(ThrottlingState::new(&now)),
94            now,
95        }
96    }
97
98    /// Create a `StatementLoggingFrontend` for use by frontend peek sequencing.
99    ///
100    /// This provides the frontend with all the state it needs to perform statement
101    /// logging without direct access to the Coordinator.
102    pub(crate) fn create_frontend(
103        &self,
104        build_info_human_version: String,
105    ) -> StatementLoggingFrontend {
106        StatementLoggingFrontend {
107            throttling_state: Arc::clone(&self.throttling_state),
108            reproducible_rng: Arc::clone(&self.reproducible_rng),
109            build_info_human_version,
110            now: self.now.clone(),
111        }
112    }
113}
114
115impl Coordinator {
116    /// Helper to write began execution events to pending buffers.
117    /// Can be called from both old and new peek sequencing.
118    fn write_began_execution_events(
119        &mut self,
120        record: StatementBeganExecutionRecord,
121        mseh_update: Row,
122        prepared_statement: Option<PreparedStatementEvent>,
123    ) {
124        // `mz_statement_execution_history`
125        self.statement_logging
126            .pending_statement_execution_events
127            .push((mseh_update, Diff::ONE));
128
129        // Track the execution for later updates
130        self.statement_logging
131            .executions_begun
132            .insert(record.id, record);
133
134        // If we have a prepared statement, log it and possibly its session
135        if let Some(ps_event) = prepared_statement {
136            let session_id = ps_event.session_id;
137            self.statement_logging
138                .pending_prepared_statement_events
139                .push(ps_event);
140
141            // Check if we need to log the session for this prepared statement
142            if let Some(sh) = self.statement_logging.unlogged_sessions.remove(&session_id) {
143                let sh_update = Self::pack_session_history_update(&sh);
144                self.statement_logging
145                    .pending_session_events
146                    .push(sh_update);
147            }
148        }
149    }
150
151    /// Handle a statement logging event from frontend peek sequencing.
152    pub(crate) fn handle_frontend_statement_logging_event(
153        &mut self,
154        event: FrontendStatementLoggingEvent,
155    ) {
156        match event {
157            FrontendStatementLoggingEvent::BeganExecution {
158                record,
159                mseh_update,
160                prepared_statement,
161            } => {
162                self.record_statement_lifecycle_event(
163                    &StatementLoggingId(record.id),
164                    &StatementLifecycleEvent::ExecutionBegan,
165                    record.began_at,
166                );
167                self.write_began_execution_events(record, mseh_update, prepared_statement);
168            }
169            FrontendStatementLoggingEvent::EndedExecution(ended_record) => {
170                self.end_statement_execution(
171                    StatementLoggingId(ended_record.id),
172                    ended_record.reason,
173                );
174            }
175            FrontendStatementLoggingEvent::SetCluster {
176                id,
177                cluster_id,
178                cluster_name,
179            } => {
180                self.set_statement_execution_cluster(id, cluster_id, cluster_name);
181            }
182            FrontendStatementLoggingEvent::SetTimestamp { id, timestamp } => {
183                self.set_statement_execution_timestamp(id, timestamp);
184            }
185            FrontendStatementLoggingEvent::SetTransientIndex {
186                id,
187                transient_index_id,
188            } => {
189                self.set_transient_index_id(id, transient_index_id);
190            }
191            FrontendStatementLoggingEvent::Lifecycle { id, event, when } => {
192                self.record_statement_lifecycle_event(&id, &event, when);
193            }
194        }
195    }
196
197    // TODO[btv] make this configurable via LD?
198    // Although... Logging every 5 seconds seems like it
199    // should have acceptable cost for now, since we do a
200    // group commit for tables every 1s anyway.
201    const STATEMENT_LOGGING_WRITE_INTERVAL: Duration = Duration::from_secs(5);
202
203    pub(crate) fn spawn_statement_logging_task(&self) {
204        let internal_cmd_tx = self.internal_cmd_tx.clone();
205        spawn(|| "statement_logging", async move {
206            let mut interval = tokio::time::interval(Coordinator::STATEMENT_LOGGING_WRITE_INTERVAL);
207            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
208            loop {
209                interval.tick().await;
210                let _ = internal_cmd_tx.send(Message::DrainStatementLog);
211            }
212        });
213    }
214
215    #[mz_ore::instrument(level = "debug")]
216    pub(crate) fn drain_statement_log(&mut self) {
217        let session_updates = std::mem::take(&mut self.statement_logging.pending_session_events)
218            .into_iter()
219            .map(|update| (update, Diff::ONE))
220            .collect();
221        let (prepared_statement_updates, sql_text_updates) =
222            std::mem::take(&mut self.statement_logging.pending_prepared_statement_events)
223                .into_iter()
224                .map(
225                    |PreparedStatementEvent {
226                         prepared_statement,
227                         sql_text,
228                         ..
229                     }| {
230                        ((prepared_statement, Diff::ONE), (sql_text, Diff::ONE))
231                    },
232                )
233                .unzip::<_, _, Vec<_>, Vec<_>>();
234        let statement_execution_updates =
235            std::mem::take(&mut self.statement_logging.pending_statement_execution_events);
236        let statement_lifecycle_updates =
237            std::mem::take(&mut self.statement_logging.pending_statement_lifecycle_events)
238                .into_iter()
239                .map(|update| (update, Diff::ONE))
240                .collect();
241
242        use IntrospectionType::*;
243        for (type_, updates) in [
244            (SessionHistory, session_updates),
245            (PreparedStatementHistory, prepared_statement_updates),
246            (StatementExecutionHistory, statement_execution_updates),
247            (StatementLifecycleHistory, statement_lifecycle_updates),
248            (SqlText, sql_text_updates),
249        ] {
250            if !updates.is_empty() && !self.controller.read_only() {
251                self.controller
252                    .storage
253                    .append_introspection_updates(type_, updates);
254            }
255        }
256    }
257
258    /// Check whether we need to do throttling (i.e., whether `STATEMENT_LOGGING_TARGET_DATA_RATE` is set).
259    /// If so, actually do the check.
260    ///
261    /// We expect `rows` to be the list of rows we intend to record and calculate the cost by summing the
262    /// byte lengths of the rows.
263    ///
264    /// Returns `false` if we must throttle this statement, and `true` otherwise.
265    fn statement_logging_throttling_check<'a, I>(&self, rows: I) -> bool
266    where
267        I: IntoIterator<Item = Option<&'a Row>>,
268    {
269        let cost = rows
270            .into_iter()
271            .filter_map(|row_opt| row_opt.map(|row| row.byte_len()))
272            .fold(0_usize, |acc, x| acc.saturating_add(x));
273
274        let Some(target_data_rate) = self
275            .catalog
276            .system_config()
277            .statement_logging_target_data_rate()
278        else {
279            return true;
280        };
281        let max_data_credit = self
282            .catalog
283            .system_config()
284            .statement_logging_max_data_credit();
285
286        self.statement_logging.throttling_state.throttling_check(
287            cost.cast_into(),
288            target_data_rate.cast_into(),
289            max_data_credit.map(CastInto::cast_into),
290            &self.statement_logging.now,
291        )
292    }
293
294    /// Marks a prepared statement as "already logged".
295    /// Mutates the `PreparedStatementLoggingInfo` metadata.
296    fn record_prepared_statement_as_logged(
297        &self,
298        uuid: Uuid,
299        session: &mut Session,
300        logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
301    ) {
302        let logging = session.qcell_rw(&*logging);
303        if let PreparedStatementLoggingInfo::StillToLog { .. } = logging {
304            *logging = PreparedStatementLoggingInfo::AlreadyLogged { uuid };
305        }
306    }
307
308    /// Returns any statement logging events needed for a particular
309    /// prepared statement. This is a read-only operation that does not mutate
310    /// the `PreparedStatementLoggingInfo` metadata.
311    ///
312    /// This function does not do a sampling check, and assumes we did so in a higher layer.
313    /// It also does not do a throttling check - that is done separately in `begin_statement_execution`.
314    ///
315    /// Returns a tuple containing:
316    /// - `Option<(StatementPreparedRecord, PreparedStatementEvent)>`: If the prepared statement
317    ///   has not yet been logged, returns the prepared statement record and the packed rows.
318    /// - `Uuid`: The UUID of the prepared statement.
319    pub(crate) fn get_prepared_statement_info(
320        &self,
321        session: &Session,
322        logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
323    ) -> (
324        Option<(StatementPreparedRecord, PreparedStatementEvent)>,
325        Uuid,
326    ) {
327        let logging = session.qcell_ro(&*logging);
328
329        match logging {
330            PreparedStatementLoggingInfo::AlreadyLogged { uuid } => (None, *uuid),
331            PreparedStatementLoggingInfo::StillToLog {
332                sql,
333                redacted_sql,
334                prepared_at,
335                name,
336                session_id,
337                accounted,
338                kind,
339                _sealed: _,
340            } => {
341                assert!(
342                    *accounted,
343                    "accounting for logging should be done in `begin_statement_execution`"
344                );
345                let uuid = epoch_to_uuid_v7(prepared_at);
346                let sql_hash: [u8; 32] = Sha256::digest(sql.as_bytes()).into();
347                let record = StatementPreparedRecord {
348                    id: uuid,
349                    sql_hash,
350                    name: name.to_string(),
351                    session_id: *session_id,
352                    prepared_at: *prepared_at,
353                    kind: *kind,
354                };
355
356                // `mz_prepared_statement_history`
357                let mut mpsh_row = Row::default();
358                let mut mpsh_packer = mpsh_row.packer();
359                pack_statement_prepared_update(&record, &mut mpsh_packer);
360                let throttled_count = self
361                    .statement_logging
362                    .throttling_state
363                    .get_throttled_count();
364                mpsh_packer.push(Datum::UInt64(CastFrom::cast_from(throttled_count)));
365
366                let sql_row = Row::pack([
367                    Datum::TimestampTz(
368                        to_datetime(*prepared_at)
369                            .truncate_day()
370                            .try_into()
371                            .expect("must fit"),
372                    ),
373                    Datum::Bytes(sql_hash.as_slice()),
374                    Datum::String(sql.as_str()),
375                    Datum::String(redacted_sql.as_str()),
376                ]);
377
378                (
379                    Some((
380                        record,
381                        PreparedStatementEvent {
382                            prepared_statement: mpsh_row,
383                            sql_text: sql_row,
384                            session_id: *session_id,
385                        },
386                    )),
387                    uuid,
388                )
389            }
390        }
391    }
392
393    /// Record the end of statement execution for a statement whose beginning was logged.
394    /// It is an error to call this function for a statement whose beginning was not logged
395    /// (because it was not sampled). Requiring the opaque `StatementLoggingId` type,
396    /// which is only instantiated by `begin_statement_execution` if the statement is actually logged,
397    /// should prevent this.
398    ///
399    /// It is also an error to end the same execution twice; the duplicate end is
400    /// reported and ignored, keeping the first end.
401    pub(crate) fn end_statement_execution(
402        &mut self,
403        id: StatementLoggingId,
404        reason: StatementEndedExecutionReason,
405    ) {
406        let StatementLoggingId(uuid) = id;
407        let now = self.now();
408        let ended_record = StatementEndedExecutionRecord {
409            id: uuid,
410            reason,
411            ended_at: now,
412        };
413
414        let Some(began_record) = self.statement_logging.executions_begun.remove(&uuid) else {
415            // A `StatementLoggingId` is only minted when a begin is logged, so
416            // a missing entry means this execution was already ended: some bug
417            // ended it twice. That's worth a loud report, but statement
418            // logging must never abort environmentd in production.
419            soft_panic_or_log!(
420                "duplicate end_statement_execution for statement {uuid}, reason: {:?}",
421                ended_record.reason
422            );
423            return;
424        };
425        for (row, diff) in
426            Self::pack_statement_ended_execution_updates(&began_record, &ended_record)
427        {
428            self.statement_logging
429                .pending_statement_execution_events
430                .push((row, diff));
431        }
432        self.record_statement_lifecycle_event(
433            &id,
434            &StatementLifecycleEvent::ExecutionFinished,
435            now,
436        );
437    }
438
439    fn pack_session_history_update(event: &SessionHistoryEvent) -> Row {
440        let SessionHistoryEvent {
441            id,
442            connected_at,
443            application_name,
444            authenticated_user,
445        } = event;
446        Row::pack_slice(&[
447            Datum::Uuid(*id),
448            Datum::TimestampTz(to_datetime(*connected_at).try_into().expect("must fit")),
449            Datum::String(&*application_name),
450            Datum::String(&*authenticated_user),
451        ])
452    }
453
454    fn pack_statement_lifecycle_event(
455        StatementLoggingId(uuid): &StatementLoggingId,
456        event: &StatementLifecycleEvent,
457        when: EpochMillis,
458    ) -> Row {
459        Row::pack_slice(&[
460            Datum::Uuid(*uuid),
461            Datum::String(event.as_str()),
462            Datum::TimestampTz(to_datetime(when).try_into().expect("must fit")),
463        ])
464    }
465
466    fn pack_full_statement_execution_update(
467        began_record: &StatementBeganExecutionRecord,
468        ended_record: &StatementEndedExecutionRecord,
469    ) -> Row {
470        let mut row = Row::default();
471        let mut packer = row.packer();
472        pack_statement_execution_inner(began_record, &mut packer);
473        let (status, error_message, result_size, rows_returned, execution_strategy) =
474            match &ended_record.reason {
475                StatementEndedExecutionReason::Success {
476                    result_size,
477                    rows_returned,
478                    execution_strategy,
479                } => (
480                    "success",
481                    None,
482                    result_size.map(|rs| i64::try_from(rs).expect("must fit")),
483                    rows_returned.map(|rr| i64::try_from(rr).expect("must fit")),
484                    execution_strategy.map(|es| es.name()),
485                ),
486                StatementEndedExecutionReason::Canceled => ("canceled", None, None, None, None),
487                StatementEndedExecutionReason::Errored { error } => {
488                    ("error", Some(error.as_str()), None, None, None)
489                }
490                StatementEndedExecutionReason::Aborted => ("aborted", None, None, None, None),
491            };
492        packer.extend([
493            Datum::TimestampTz(
494                to_datetime(ended_record.ended_at)
495                    .try_into()
496                    .expect("Sane system time"),
497            ),
498            status.into(),
499            error_message.into(),
500            result_size.into(),
501            rows_returned.into(),
502            execution_strategy.into(),
503        ]);
504        row
505    }
506
507    fn pack_statement_ended_execution_updates(
508        began_record: &StatementBeganExecutionRecord,
509        ended_record: &StatementEndedExecutionRecord,
510    ) -> [(Row, Diff); 2] {
511        let retraction = pack_statement_began_execution_update(began_record);
512        let new = Self::pack_full_statement_execution_update(began_record, ended_record);
513        [(retraction, Diff::MINUS_ONE), (new, Diff::ONE)]
514    }
515
516    /// Mutate a statement execution record via the given function `f`.
517    fn mutate_record<F: FnOnce(&mut StatementBeganExecutionRecord)>(
518        &mut self,
519        StatementLoggingId(id): StatementLoggingId,
520        f: F,
521    ) {
522        let record = self
523            .statement_logging
524            .executions_begun
525            .get_mut(&id)
526            .expect("mutate_record must not be called after execution ends");
527        let retraction = pack_statement_began_execution_update(record);
528        self.statement_logging
529            .pending_statement_execution_events
530            .push((retraction, Diff::MINUS_ONE));
531        f(record);
532        let update = pack_statement_began_execution_update(record);
533        self.statement_logging
534            .pending_statement_execution_events
535            .push((update, Diff::ONE));
536    }
537
538    /// Set the `cluster_id` and `cluster_name` for a statement, once they're known.
539    ///
540    /// The name is resolved by the caller (from the same catalog snapshot that selected the
541    /// cluster), rather than re-resolved here from `cluster_id`. This avoids a panic when the
542    /// cluster was concurrently dropped, and logs the name as it was at selection time.
543    ///
544    /// TODO(peek-seq): We could do the packing in the frontend task, and just send over the rows.
545    pub(crate) fn set_statement_execution_cluster(
546        &mut self,
547        id: StatementLoggingId,
548        cluster_id: ClusterId,
549        cluster_name: String,
550    ) {
551        self.mutate_record(id, |record| {
552            record.cluster_name = Some(cluster_name);
553            record.cluster_id = Some(cluster_id);
554        });
555    }
556
557    /// Set the `execution_timestamp` for a statement, once it's known
558    pub(crate) fn set_statement_execution_timestamp(
559        &mut self,
560        id: StatementLoggingId,
561        timestamp: Timestamp,
562    ) {
563        self.mutate_record(id, |record| {
564            record.execution_timestamp = Some(u64::from(timestamp));
565        });
566    }
567
568    pub(crate) fn set_transient_index_id(
569        &mut self,
570        id: StatementLoggingId,
571        transient_index_id: GlobalId,
572    ) {
573        self.mutate_record(id, |record| {
574            record.transient_index_id = Some(transient_index_id)
575        });
576    }
577
578    /// Possibly record the beginning of statement execution, depending on a randomly-chosen value.
579    /// If the execution beginning was indeed logged, returns a `StatementLoggingId` that must be
580    /// passed to `end_statement_execution` to record when it ends.
581    ///
582    /// `lifecycle_timestamps` has timestamps that come from the Adapter frontend (`mz-pgwire`) part
583    /// of the lifecycle.
584    pub(crate) fn begin_statement_execution(
585        &mut self,
586        session: &mut Session,
587        params: &Params,
588        logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
589        lifecycle_timestamps: Option<LifecycleTimestamps>,
590    ) -> Option<StatementLoggingId> {
591        let enable_internal_statement_logging = self
592            .catalog()
593            .system_config()
594            .enable_internal_statement_logging();
595        if session.user().is_internal() && !enable_internal_statement_logging {
596            return None;
597        }
598
599        let sample_rate = effective_sample_rate(session, self.catalog().system_config());
600        let use_reproducible_rng = self
601            .catalog()
602            .system_config()
603            .statement_logging_use_reproducible_rng();
604        // Only lock the RNG when we actually need reproducible sampling (tests only)
605        let sample = if use_reproducible_rng {
606            let mut rng = self
607                .statement_logging
608                .reproducible_rng
609                .lock()
610                .expect("rng lock poisoned");
611            should_sample_statement(sample_rate, Some(&mut *rng))
612        } else {
613            should_sample_statement(sample_rate, None)
614        };
615
616        // Figure out the cost of everything before we log.
617
618        // Track how many statements we're recording.
619        let sampled_label = sample.then_some("true").unwrap_or("false");
620        self.metrics
621            .statement_logging_records
622            .with_label_values(&[sampled_label])
623            .inc_by(1);
624
625        if let Some((sql, accounted)) = match session.qcell_rw(logging) {
626            PreparedStatementLoggingInfo::AlreadyLogged { .. } => None,
627            PreparedStatementLoggingInfo::StillToLog { sql, accounted, .. } => {
628                Some((sql, accounted))
629            }
630        } {
631            if !*accounted {
632                self.metrics
633                    .statement_logging_unsampled_bytes
634                    .inc_by(u64::cast_from(sql.len()));
635                if sample {
636                    self.metrics
637                        .statement_logging_actual_bytes
638                        .inc_by(u64::cast_from(sql.len()));
639                }
640                *accounted = true;
641            }
642        }
643        if !sample {
644            return None;
645        }
646
647        let (maybe_ps, ps_uuid) = self.get_prepared_statement_info(session, logging);
648
649        let began_at = if let Some(lifecycle_timestamps) = lifecycle_timestamps {
650            lifecycle_timestamps.received
651        } else {
652            self.now()
653        };
654        let now = self.now();
655        let execution_uuid = epoch_to_uuid_v7(&now);
656
657        let build_info_version = self
658            .catalog()
659            .state()
660            .config()
661            .build_info
662            .human_version(None);
663        let record = create_began_execution_record(
664            execution_uuid,
665            ps_uuid,
666            sample_rate,
667            params,
668            session,
669            began_at,
670            build_info_version,
671        );
672
673        // `mz_statement_execution_history`
674        let mseh_update = pack_statement_began_execution_update(&record);
675
676        let (maybe_ps_event, maybe_sh_event) = if let Some((ps_record, ps_event)) = maybe_ps {
677            if let Some(sh) = self
678                .statement_logging
679                .unlogged_sessions
680                .get(&ps_record.session_id)
681            {
682                (
683                    Some(ps_event),
684                    Some((Self::pack_session_history_update(sh), ps_record.session_id)),
685                )
686            } else {
687                (Some(ps_event), None)
688            }
689        } else {
690            (None, None)
691        };
692
693        let maybe_ps_prepared_statement = maybe_ps_event.as_ref().map(|e| &e.prepared_statement);
694        let maybe_ps_sql_text = maybe_ps_event.as_ref().map(|e| &e.sql_text);
695
696        if !self.statement_logging_throttling_check([
697            Some(&mseh_update),
698            maybe_ps_prepared_statement,
699            maybe_ps_sql_text,
700            maybe_sh_event.as_ref().map(|(row, _)| row),
701        ]) {
702            // Increment throttled_count in shared state
703            self.statement_logging
704                .throttling_state
705                .increment_throttled_count();
706            return None;
707        }
708        // When we successfully log the first instance of a prepared statement
709        // (i.e., it is not throttled), we also capture the number of previously
710        // throttled statement executions in the builtin prepared statement history table above,
711        // and then reset the throttled count for future tracking.
712        else if let PreparedStatementLoggingInfo::StillToLog { .. } = session.qcell_ro(logging) {
713            self.statement_logging
714                .throttling_state
715                .reset_throttled_count();
716        }
717
718        self.record_prepared_statement_as_logged(ps_uuid, session, logging);
719
720        self.record_statement_lifecycle_event(
721            &StatementLoggingId(execution_uuid),
722            &StatementLifecycleEvent::ExecutionBegan,
723            began_at,
724        );
725
726        self.statement_logging
727            .pending_statement_execution_events
728            .push((mseh_update, Diff::ONE));
729        self.statement_logging
730            .executions_begun
731            .insert(execution_uuid, record);
732
733        if let Some((sh_update, session_id)) = maybe_sh_event {
734            self.statement_logging
735                .pending_session_events
736                .push(sh_update);
737            // Mark the session as logged to avoid logging it again in the future
738            self.statement_logging.unlogged_sessions.remove(&session_id);
739        }
740        if let Some(ps_event) = maybe_ps_event {
741            self.statement_logging
742                .pending_prepared_statement_events
743                .push(ps_event);
744        }
745
746        Some(StatementLoggingId(execution_uuid))
747    }
748
749    /// Record a new connection event
750    pub(crate) fn begin_session_for_statement_logging(&mut self, session: &ConnMeta) {
751        let id = session.uuid();
752        let session_role = session.authenticated_role_id();
753        let event = SessionHistoryEvent {
754            id,
755            connected_at: session.connected_at(),
756            application_name: session.application_name().to_owned(),
757            authenticated_user: self.catalog.get_role(session_role).name.clone(),
758        };
759        self.statement_logging.unlogged_sessions.insert(id, event);
760    }
761
762    pub(crate) fn end_session_for_statement_logging(&mut self, uuid: Uuid) {
763        self.statement_logging.unlogged_sessions.remove(&uuid);
764    }
765
766    pub(crate) fn record_statement_lifecycle_event(
767        &mut self,
768        id: &StatementLoggingId,
769        event: &StatementLifecycleEvent,
770        when: EpochMillis,
771    ) {
772        if mz_adapter_types::dyncfgs::ENABLE_STATEMENT_LIFECYCLE_LOGGING
773            .get(self.catalog().system_config().dyncfgs())
774        {
775            let row = Self::pack_statement_lifecycle_event(id, event, when);
776            self.statement_logging
777                .pending_statement_lifecycle_events
778                .push(row);
779        }
780    }
781
782    /// Install watch sets for statement lifecycle logging.
783    ///
784    /// This installs both storage and compute watch sets that will fire
785    /// `StatementLifecycleEvent::StorageDependenciesFinished` and
786    /// `StatementLifecycleEvent::ComputeDependenciesFinished` respectively
787    /// when the dependencies are ready at the given timestamp.
788    pub(crate) fn install_peek_watch_sets(
789        &mut self,
790        conn_id: ConnectionId,
791        watch_set: WatchSetCreation,
792    ) -> Result<(), CollectionLookupError> {
793        let WatchSetCreation {
794            logging_id,
795            timestamp,
796            storage_ids,
797            compute_ids,
798        } = watch_set;
799
800        self.install_storage_watch_set(
801            conn_id.clone(),
802            storage_ids,
803            timestamp,
804            WatchSetResponse::StatementDependenciesReady(
805                logging_id,
806                StatementLifecycleEvent::StorageDependenciesFinished,
807            ),
808        )?;
809        self.install_compute_watch_set(
810            conn_id,
811            compute_ids,
812            timestamp,
813            WatchSetResponse::StatementDependenciesReady(
814                logging_id,
815                StatementLifecycleEvent::ComputeDependenciesFinished,
816            ),
817        )?;
818        Ok(())
819    }
820}