mz_adapter/coord/
statement_logging.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::sync::{Arc, Mutex};
12use std::time::Duration;
13
14use mz_adapter_types::connection::ConnectionId;
15use mz_compute_client::controller::error::CollectionLookupError;
16use mz_controller_types::ClusterId;
17use mz_ore::now::{EpochMillis, NowFn, epoch_to_uuid_v7, to_datetime};
18use mz_ore::task::spawn;
19use mz_ore::{cast::CastFrom, cast::CastInto};
20use mz_repr::adt::timestamp::TimestampLike;
21use mz_repr::{Datum, Diff, GlobalId, Row, Timestamp};
22use mz_sql::plan::Params;
23use mz_sql::session::metadata::SessionMetadata;
24use mz_storage_client::controller::IntrospectionType;
25use qcell::QCell;
26use rand::SeedableRng;
27use sha2::{Digest, Sha256};
28use tokio::time::MissedTickBehavior;
29use uuid::Uuid;
30
31use crate::coord::{ConnMeta, Coordinator, WatchSetResponse};
32use crate::session::{LifecycleTimestamps, Session};
33use crate::statement_logging::{
34    FrontendStatementLoggingEvent, PreparedStatementEvent, PreparedStatementLoggingInfo,
35    SessionHistoryEvent, StatementBeganExecutionRecord, StatementEndedExecutionReason,
36    StatementEndedExecutionRecord, StatementLifecycleEvent, StatementLoggingFrontend,
37    StatementLoggingId, StatementPreparedRecord, ThrottlingState, WatchSetCreation,
38    create_began_execution_record, effective_sample_rate, pack_statement_began_execution_update,
39    pack_statement_execution_inner, pack_statement_prepared_update, should_sample_statement,
40};
41
42use super::Message;
43
44/// Statement logging state in the Coordinator.
45#[derive(Debug)]
46pub(crate) struct StatementLogging {
47    /// Information about statement executions that have been logged
48    /// but not finished.
49    ///
50    /// This map needs to have enough state left over to later retract
51    /// the system table entries (so that we can update them when the
52    /// execution finished.)
53    executions_begun: BTreeMap<Uuid, StatementBeganExecutionRecord>,
54
55    /// Information about sessions that have been started, but which
56    /// have not yet been logged in `mz_session_history`.
57    /// They may be logged as part of a statement being executed (and chosen for logging).
58    unlogged_sessions: BTreeMap<Uuid, SessionHistoryEvent>,
59
60    /// A reproducible RNG for deciding whether to sample statement executions.
61    /// Only used by tests; otherwise, `rand::rng()` is used.
62    /// Controlled by the system var `statement_logging_use_reproducible_rng`.
63    /// This same instance will be used by all frontend tasks.
64    reproducible_rng: Arc<Mutex<rand_chacha::ChaCha8Rng>>,
65
66    /// Events to be persisted periodically.
67    pending_statement_execution_events: Vec<(Row, Diff)>,
68    pending_prepared_statement_events: Vec<PreparedStatementEvent>,
69    pending_session_events: Vec<Row>,
70    pending_statement_lifecycle_events: Vec<Row>,
71
72    /// Shared throttling state for rate-limiting statement logging.
73    pub(crate) throttling_state: Arc<ThrottlingState>,
74
75    /// Function to get the current time.
76    pub(crate) now: NowFn,
77}
78
79impl StatementLogging {
80    const REPRODUCIBLE_RNG_SEED: u64 = 42;
81
82    pub(crate) fn new(now: NowFn) -> Self {
83        Self {
84            executions_begun: BTreeMap::new(),
85            unlogged_sessions: BTreeMap::new(),
86            reproducible_rng: Arc::new(Mutex::new(rand_chacha::ChaCha8Rng::seed_from_u64(
87                Self::REPRODUCIBLE_RNG_SEED,
88            ))),
89            pending_statement_execution_events: Vec::new(),
90            pending_prepared_statement_events: Vec::new(),
91            pending_session_events: Vec::new(),
92            pending_statement_lifecycle_events: Vec::new(),
93            throttling_state: Arc::new(ThrottlingState::new(&now)),
94            now,
95        }
96    }
97
98    /// Create a `StatementLoggingFrontend` for use by frontend peek sequencing.
99    ///
100    /// This provides the frontend with all the state it needs to perform statement
101    /// logging without direct access to the Coordinator.
102    pub(crate) fn create_frontend(
103        &self,
104        build_info_human_version: String,
105    ) -> StatementLoggingFrontend {
106        StatementLoggingFrontend {
107            throttling_state: Arc::clone(&self.throttling_state),
108            reproducible_rng: Arc::clone(&self.reproducible_rng),
109            build_info_human_version,
110            now: self.now.clone(),
111        }
112    }
113}
114
115impl Coordinator {
116    /// Helper to write began execution events to pending buffers.
117    /// Can be called from both old and new peek sequencing.
118    fn write_began_execution_events(
119        &mut self,
120        record: StatementBeganExecutionRecord,
121        mseh_update: Row,
122        prepared_statement: Option<PreparedStatementEvent>,
123    ) {
124        // `mz_statement_execution_history`
125        self.statement_logging
126            .pending_statement_execution_events
127            .push((mseh_update, Diff::ONE));
128
129        // Track the execution for later updates
130        self.statement_logging
131            .executions_begun
132            .insert(record.id, record);
133
134        // If we have a prepared statement, log it and possibly its session
135        if let Some(ps_event) = prepared_statement {
136            let session_id = ps_event.session_id;
137            self.statement_logging
138                .pending_prepared_statement_events
139                .push(ps_event);
140
141            // Check if we need to log the session for this prepared statement
142            if let Some(sh) = self.statement_logging.unlogged_sessions.remove(&session_id) {
143                let sh_update = Self::pack_session_history_update(&sh);
144                self.statement_logging
145                    .pending_session_events
146                    .push(sh_update);
147            }
148        }
149    }
150
151    /// Handle a statement logging event from frontend peek sequencing.
152    pub(crate) fn handle_frontend_statement_logging_event(
153        &mut self,
154        event: FrontendStatementLoggingEvent,
155    ) {
156        match event {
157            FrontendStatementLoggingEvent::BeganExecution {
158                record,
159                mseh_update,
160                prepared_statement,
161            } => {
162                self.record_statement_lifecycle_event(
163                    &StatementLoggingId(record.id),
164                    &StatementLifecycleEvent::ExecutionBegan,
165                    record.began_at,
166                );
167                self.write_began_execution_events(record, mseh_update, prepared_statement);
168            }
169            FrontendStatementLoggingEvent::EndedExecution(ended_record) => {
170                self.end_statement_execution(
171                    StatementLoggingId(ended_record.id),
172                    ended_record.reason,
173                );
174            }
175            FrontendStatementLoggingEvent::SetCluster { id, cluster_id } => {
176                self.set_statement_execution_cluster(id, cluster_id);
177            }
178            FrontendStatementLoggingEvent::SetTimestamp { id, timestamp } => {
179                self.set_statement_execution_timestamp(id, timestamp);
180            }
181            FrontendStatementLoggingEvent::SetTransientIndex {
182                id,
183                transient_index_id,
184            } => {
185                self.set_transient_index_id(id, transient_index_id);
186            }
187            FrontendStatementLoggingEvent::Lifecycle { id, event, when } => {
188                self.record_statement_lifecycle_event(&id, &event, when);
189            }
190        }
191    }
192
193    // TODO[btv] make this configurable via LD?
194    // Although... Logging every 5 seconds seems like it
195    // should have acceptable cost for now, since we do a
196    // group commit for tables every 1s anyway.
197    const STATEMENT_LOGGING_WRITE_INTERVAL: Duration = Duration::from_secs(5);
198
199    pub(crate) fn spawn_statement_logging_task(&self) {
200        let internal_cmd_tx = self.internal_cmd_tx.clone();
201        spawn(|| "statement_logging", async move {
202            let mut interval = tokio::time::interval(Coordinator::STATEMENT_LOGGING_WRITE_INTERVAL);
203            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
204            loop {
205                interval.tick().await;
206                let _ = internal_cmd_tx.send(Message::DrainStatementLog);
207            }
208        });
209    }
210
211    #[mz_ore::instrument(level = "debug")]
212    pub(crate) fn drain_statement_log(&mut self) {
213        let session_updates = std::mem::take(&mut self.statement_logging.pending_session_events)
214            .into_iter()
215            .map(|update| (update, Diff::ONE))
216            .collect();
217        let (prepared_statement_updates, sql_text_updates) =
218            std::mem::take(&mut self.statement_logging.pending_prepared_statement_events)
219                .into_iter()
220                .map(
221                    |PreparedStatementEvent {
222                         prepared_statement,
223                         sql_text,
224                         ..
225                     }| {
226                        ((prepared_statement, Diff::ONE), (sql_text, Diff::ONE))
227                    },
228                )
229                .unzip::<_, _, Vec<_>, Vec<_>>();
230        let statement_execution_updates =
231            std::mem::take(&mut self.statement_logging.pending_statement_execution_events);
232        let statement_lifecycle_updates =
233            std::mem::take(&mut self.statement_logging.pending_statement_lifecycle_events)
234                .into_iter()
235                .map(|update| (update, Diff::ONE))
236                .collect();
237
238        use IntrospectionType::*;
239        for (type_, updates) in [
240            (SessionHistory, session_updates),
241            (PreparedStatementHistory, prepared_statement_updates),
242            (StatementExecutionHistory, statement_execution_updates),
243            (StatementLifecycleHistory, statement_lifecycle_updates),
244            (SqlText, sql_text_updates),
245        ] {
246            if !updates.is_empty() && !self.controller.read_only() {
247                self.controller
248                    .storage
249                    .append_introspection_updates(type_, updates);
250            }
251        }
252    }
253
254    /// Check whether we need to do throttling (i.e., whether `STATEMENT_LOGGING_TARGET_DATA_RATE` is set).
255    /// If so, actually do the check.
256    ///
257    /// We expect `rows` to be the list of rows we intend to record and calculate the cost by summing the
258    /// byte lengths of the rows.
259    ///
260    /// Returns `false` if we must throttle this statement, and `true` otherwise.
261    fn statement_logging_throttling_check<'a, I>(&self, rows: I) -> bool
262    where
263        I: IntoIterator<Item = Option<&'a Row>>,
264    {
265        let cost = rows
266            .into_iter()
267            .filter_map(|row_opt| row_opt.map(|row| row.byte_len()))
268            .fold(0_usize, |acc, x| acc.saturating_add(x));
269
270        let Some(target_data_rate) = self
271            .catalog
272            .system_config()
273            .statement_logging_target_data_rate()
274        else {
275            return true;
276        };
277        let max_data_credit = self
278            .catalog
279            .system_config()
280            .statement_logging_max_data_credit();
281
282        self.statement_logging.throttling_state.throttling_check(
283            cost.cast_into(),
284            target_data_rate.cast_into(),
285            max_data_credit.map(CastInto::cast_into),
286            &self.statement_logging.now,
287        )
288    }
289
290    /// Marks a prepared statement as "already logged".
291    /// Mutates the `PreparedStatementLoggingInfo` metadata.
292    fn record_prepared_statement_as_logged(
293        &self,
294        uuid: Uuid,
295        session: &mut Session,
296        logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
297    ) {
298        let logging = session.qcell_rw(&*logging);
299        if let PreparedStatementLoggingInfo::StillToLog { .. } = logging {
300            *logging = PreparedStatementLoggingInfo::AlreadyLogged { uuid };
301        }
302    }
303
304    /// Returns any statement logging events needed for a particular
305    /// prepared statement. This is a read-only operation that does not mutate
306    /// the `PreparedStatementLoggingInfo` metadata.
307    ///
308    /// This function does not do a sampling check, and assumes we did so in a higher layer.
309    /// It also does not do a throttling check - that is done separately in `begin_statement_execution`.
310    ///
311    /// Returns a tuple containing:
312    /// - `Option<(StatementPreparedRecord, PreparedStatementEvent)>`: If the prepared statement
313    ///   has not yet been logged, returns the prepared statement record and the packed rows.
314    /// - `Uuid`: The UUID of the prepared statement.
315    pub(crate) fn get_prepared_statement_info(
316        &self,
317        session: &Session,
318        logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
319    ) -> (
320        Option<(StatementPreparedRecord, PreparedStatementEvent)>,
321        Uuid,
322    ) {
323        let logging = session.qcell_ro(&*logging);
324
325        match logging {
326            PreparedStatementLoggingInfo::AlreadyLogged { uuid } => (None, *uuid),
327            PreparedStatementLoggingInfo::StillToLog {
328                sql,
329                redacted_sql,
330                prepared_at,
331                name,
332                session_id,
333                accounted,
334                kind,
335                _sealed: _,
336            } => {
337                assert!(
338                    *accounted,
339                    "accounting for logging should be done in `begin_statement_execution`"
340                );
341                let uuid = epoch_to_uuid_v7(prepared_at);
342                let sql_hash: [u8; 32] = Sha256::digest(sql.as_bytes()).into();
343                let record = StatementPreparedRecord {
344                    id: uuid,
345                    sql_hash,
346                    name: name.to_string(),
347                    session_id: *session_id,
348                    prepared_at: *prepared_at,
349                    kind: *kind,
350                };
351
352                // `mz_prepared_statement_history`
353                let mut mpsh_row = Row::default();
354                let mut mpsh_packer = mpsh_row.packer();
355                pack_statement_prepared_update(&record, &mut mpsh_packer);
356                let throttled_count = self
357                    .statement_logging
358                    .throttling_state
359                    .get_throttled_count();
360                mpsh_packer.push(Datum::UInt64(CastFrom::cast_from(throttled_count)));
361
362                let sql_row = Row::pack([
363                    Datum::TimestampTz(
364                        to_datetime(*prepared_at)
365                            .truncate_day()
366                            .try_into()
367                            .expect("must fit"),
368                    ),
369                    Datum::Bytes(sql_hash.as_slice()),
370                    Datum::String(sql.as_str()),
371                    Datum::String(redacted_sql.as_str()),
372                ]);
373
374                (
375                    Some((
376                        record,
377                        PreparedStatementEvent {
378                            prepared_statement: mpsh_row,
379                            sql_text: sql_row,
380                            session_id: *session_id,
381                        },
382                    )),
383                    uuid,
384                )
385            }
386        }
387    }
388
389    /// Record the end of statement execution for a statement whose beginning was logged.
390    /// It is an error to call this function for a statement whose beginning was not logged
391    /// (because it was not sampled). Requiring the opaque `StatementLoggingId` type,
392    /// which is only instantiated by `begin_statement_execution` if the statement is actually logged,
393    /// should prevent this.
394    pub(crate) fn end_statement_execution(
395        &mut self,
396        id: StatementLoggingId,
397        reason: StatementEndedExecutionReason,
398    ) {
399        let StatementLoggingId(uuid) = id;
400        let now = self.now();
401        let ended_record = StatementEndedExecutionRecord {
402            id: uuid,
403            reason,
404            ended_at: now,
405        };
406
407        let began_record = self
408            .statement_logging
409            .executions_begun
410            .remove(&uuid)
411            .expect(
412                "matched `begin_statement_execution` and `end_statement_execution` invocations",
413            );
414        for (row, diff) in
415            Self::pack_statement_ended_execution_updates(&began_record, &ended_record)
416        {
417            self.statement_logging
418                .pending_statement_execution_events
419                .push((row, diff));
420        }
421        self.record_statement_lifecycle_event(
422            &id,
423            &StatementLifecycleEvent::ExecutionFinished,
424            now,
425        );
426    }
427
428    fn pack_session_history_update(event: &SessionHistoryEvent) -> Row {
429        let SessionHistoryEvent {
430            id,
431            connected_at,
432            application_name,
433            authenticated_user,
434        } = event;
435        Row::pack_slice(&[
436            Datum::Uuid(*id),
437            Datum::TimestampTz(to_datetime(*connected_at).try_into().expect("must fit")),
438            Datum::String(&*application_name),
439            Datum::String(&*authenticated_user),
440        ])
441    }
442
443    fn pack_statement_lifecycle_event(
444        StatementLoggingId(uuid): &StatementLoggingId,
445        event: &StatementLifecycleEvent,
446        when: EpochMillis,
447    ) -> Row {
448        Row::pack_slice(&[
449            Datum::Uuid(*uuid),
450            Datum::String(event.as_str()),
451            Datum::TimestampTz(to_datetime(when).try_into().expect("must fit")),
452        ])
453    }
454
455    fn pack_full_statement_execution_update(
456        began_record: &StatementBeganExecutionRecord,
457        ended_record: &StatementEndedExecutionRecord,
458    ) -> Row {
459        let mut row = Row::default();
460        let mut packer = row.packer();
461        pack_statement_execution_inner(began_record, &mut packer);
462        let (status, error_message, result_size, rows_returned, execution_strategy) =
463            match &ended_record.reason {
464                StatementEndedExecutionReason::Success {
465                    result_size,
466                    rows_returned,
467                    execution_strategy,
468                } => (
469                    "success",
470                    None,
471                    result_size.map(|rs| i64::try_from(rs).expect("must fit")),
472                    rows_returned.map(|rr| i64::try_from(rr).expect("must fit")),
473                    execution_strategy.map(|es| es.name()),
474                ),
475                StatementEndedExecutionReason::Canceled => ("canceled", None, None, None, None),
476                StatementEndedExecutionReason::Errored { error } => {
477                    ("error", Some(error.as_str()), None, None, None)
478                }
479                StatementEndedExecutionReason::Aborted => ("aborted", None, None, None, None),
480            };
481        packer.extend([
482            Datum::TimestampTz(
483                to_datetime(ended_record.ended_at)
484                    .try_into()
485                    .expect("Sane system time"),
486            ),
487            status.into(),
488            error_message.into(),
489            result_size.into(),
490            rows_returned.into(),
491            execution_strategy.into(),
492        ]);
493        row
494    }
495
496    fn pack_statement_ended_execution_updates(
497        began_record: &StatementBeganExecutionRecord,
498        ended_record: &StatementEndedExecutionRecord,
499    ) -> [(Row, Diff); 2] {
500        let retraction = pack_statement_began_execution_update(began_record);
501        let new = Self::pack_full_statement_execution_update(began_record, ended_record);
502        [(retraction, Diff::MINUS_ONE), (new, Diff::ONE)]
503    }
504
505    /// Mutate a statement execution record via the given function `f`.
506    fn mutate_record<F: FnOnce(&mut StatementBeganExecutionRecord)>(
507        &mut self,
508        StatementLoggingId(id): StatementLoggingId,
509        f: F,
510    ) {
511        let record = self
512            .statement_logging
513            .executions_begun
514            .get_mut(&id)
515            .expect("mutate_record must not be called after execution ends");
516        let retraction = pack_statement_began_execution_update(record);
517        self.statement_logging
518            .pending_statement_execution_events
519            .push((retraction, Diff::MINUS_ONE));
520        f(record);
521        let update = pack_statement_began_execution_update(record);
522        self.statement_logging
523            .pending_statement_execution_events
524            .push((update, Diff::ONE));
525    }
526
527    /// Set the `cluster_id` for a statement, once it's known.
528    ///
529    /// TODO(peek-seq): We could do cluster resolution and packing in the frontend task, and just
530    /// send over the rows.
531    pub(crate) fn set_statement_execution_cluster(
532        &mut self,
533        id: StatementLoggingId,
534        cluster_id: ClusterId,
535    ) {
536        let cluster_name = self.catalog().get_cluster(cluster_id).name.clone();
537        self.mutate_record(id, |record| {
538            record.cluster_name = Some(cluster_name);
539            record.cluster_id = Some(cluster_id);
540        });
541    }
542
543    /// Set the `execution_timestamp` for a statement, once it's known
544    pub(crate) fn set_statement_execution_timestamp(
545        &mut self,
546        id: StatementLoggingId,
547        timestamp: Timestamp,
548    ) {
549        self.mutate_record(id, |record| {
550            record.execution_timestamp = Some(u64::from(timestamp));
551        });
552    }
553
554    pub(crate) fn set_transient_index_id(
555        &mut self,
556        id: StatementLoggingId,
557        transient_index_id: GlobalId,
558    ) {
559        self.mutate_record(id, |record| {
560            record.transient_index_id = Some(transient_index_id)
561        });
562    }
563
564    /// Possibly record the beginning of statement execution, depending on a randomly-chosen value.
565    /// If the execution beginning was indeed logged, returns a `StatementLoggingId` that must be
566    /// passed to `end_statement_execution` to record when it ends.
567    ///
568    /// `lifecycle_timestamps` has timestamps that come from the Adapter frontend (`mz-pgwire`) part
569    /// of the lifecycle.
570    pub(crate) fn begin_statement_execution(
571        &mut self,
572        session: &mut Session,
573        params: &Params,
574        logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
575        lifecycle_timestamps: Option<LifecycleTimestamps>,
576    ) -> Option<StatementLoggingId> {
577        let enable_internal_statement_logging = self
578            .catalog()
579            .system_config()
580            .enable_internal_statement_logging();
581        if session.user().is_internal() && !enable_internal_statement_logging {
582            return None;
583        }
584
585        let sample_rate = effective_sample_rate(session, self.catalog().system_config());
586        let use_reproducible_rng = self
587            .catalog()
588            .system_config()
589            .statement_logging_use_reproducible_rng();
590        // Only lock the RNG when we actually need reproducible sampling (tests only)
591        let sample = if use_reproducible_rng {
592            let mut rng = self
593                .statement_logging
594                .reproducible_rng
595                .lock()
596                .expect("rng lock poisoned");
597            should_sample_statement(sample_rate, Some(&mut *rng))
598        } else {
599            should_sample_statement(sample_rate, None)
600        };
601
602        // Figure out the cost of everything before we log.
603
604        // Track how many statements we're recording.
605        let sampled_label = sample.then_some("true").unwrap_or("false");
606        self.metrics
607            .statement_logging_records
608            .with_label_values(&[sampled_label])
609            .inc_by(1);
610
611        if let Some((sql, accounted)) = match session.qcell_rw(logging) {
612            PreparedStatementLoggingInfo::AlreadyLogged { .. } => None,
613            PreparedStatementLoggingInfo::StillToLog { sql, accounted, .. } => {
614                Some((sql, accounted))
615            }
616        } {
617            if !*accounted {
618                self.metrics
619                    .statement_logging_unsampled_bytes
620                    .inc_by(u64::cast_from(sql.len()));
621                if sample {
622                    self.metrics
623                        .statement_logging_actual_bytes
624                        .inc_by(u64::cast_from(sql.len()));
625                }
626                *accounted = true;
627            }
628        }
629        if !sample {
630            return None;
631        }
632
633        let (maybe_ps, ps_uuid) = self.get_prepared_statement_info(session, logging);
634
635        let began_at = if let Some(lifecycle_timestamps) = lifecycle_timestamps {
636            lifecycle_timestamps.received
637        } else {
638            self.now()
639        };
640        let now = self.now();
641        let execution_uuid = epoch_to_uuid_v7(&now);
642
643        let build_info_version = self
644            .catalog()
645            .state()
646            .config()
647            .build_info
648            .human_version(None);
649        let record = create_began_execution_record(
650            execution_uuid,
651            ps_uuid,
652            sample_rate,
653            params,
654            session,
655            began_at,
656            build_info_version,
657        );
658
659        // `mz_statement_execution_history`
660        let mseh_update = pack_statement_began_execution_update(&record);
661
662        let (maybe_ps_event, maybe_sh_event) = if let Some((ps_record, ps_event)) = maybe_ps {
663            if let Some(sh) = self
664                .statement_logging
665                .unlogged_sessions
666                .get(&ps_record.session_id)
667            {
668                (
669                    Some(ps_event),
670                    Some((Self::pack_session_history_update(sh), ps_record.session_id)),
671                )
672            } else {
673                (Some(ps_event), None)
674            }
675        } else {
676            (None, None)
677        };
678
679        let maybe_ps_prepared_statement = maybe_ps_event.as_ref().map(|e| &e.prepared_statement);
680        let maybe_ps_sql_text = maybe_ps_event.as_ref().map(|e| &e.sql_text);
681
682        if !self.statement_logging_throttling_check([
683            Some(&mseh_update),
684            maybe_ps_prepared_statement,
685            maybe_ps_sql_text,
686            maybe_sh_event.as_ref().map(|(row, _)| row),
687        ]) {
688            // Increment throttled_count in shared state
689            self.statement_logging
690                .throttling_state
691                .increment_throttled_count();
692            return None;
693        }
694        // When we successfully log the first instance of a prepared statement
695        // (i.e., it is not throttled), we also capture the number of previously
696        // throttled statement executions in the builtin prepared statement history table above,
697        // and then reset the throttled count for future tracking.
698        else if let PreparedStatementLoggingInfo::StillToLog { .. } = session.qcell_ro(logging) {
699            self.statement_logging
700                .throttling_state
701                .reset_throttled_count();
702        }
703
704        self.record_prepared_statement_as_logged(ps_uuid, session, logging);
705
706        self.record_statement_lifecycle_event(
707            &StatementLoggingId(execution_uuid),
708            &StatementLifecycleEvent::ExecutionBegan,
709            began_at,
710        );
711
712        self.statement_logging
713            .pending_statement_execution_events
714            .push((mseh_update, Diff::ONE));
715        self.statement_logging
716            .executions_begun
717            .insert(execution_uuid, record);
718
719        if let Some((sh_update, session_id)) = maybe_sh_event {
720            self.statement_logging
721                .pending_session_events
722                .push(sh_update);
723            // Mark the session as logged to avoid logging it again in the future
724            self.statement_logging.unlogged_sessions.remove(&session_id);
725        }
726        if let Some(ps_event) = maybe_ps_event {
727            self.statement_logging
728                .pending_prepared_statement_events
729                .push(ps_event);
730        }
731
732        Some(StatementLoggingId(execution_uuid))
733    }
734
735    /// Record a new connection event
736    pub(crate) fn begin_session_for_statement_logging(&mut self, session: &ConnMeta) {
737        let id = session.uuid();
738        let session_role = session.authenticated_role_id();
739        let event = SessionHistoryEvent {
740            id,
741            connected_at: session.connected_at(),
742            application_name: session.application_name().to_owned(),
743            authenticated_user: self.catalog.get_role(session_role).name.clone(),
744        };
745        self.statement_logging.unlogged_sessions.insert(id, event);
746    }
747
748    pub(crate) fn end_session_for_statement_logging(&mut self, uuid: Uuid) {
749        self.statement_logging.unlogged_sessions.remove(&uuid);
750    }
751
752    pub(crate) fn record_statement_lifecycle_event(
753        &mut self,
754        id: &StatementLoggingId,
755        event: &StatementLifecycleEvent,
756        when: EpochMillis,
757    ) {
758        if mz_adapter_types::dyncfgs::ENABLE_STATEMENT_LIFECYCLE_LOGGING
759            .get(self.catalog().system_config().dyncfgs())
760        {
761            let row = Self::pack_statement_lifecycle_event(id, event, when);
762            self.statement_logging
763                .pending_statement_lifecycle_events
764                .push(row);
765        }
766    }
767
768    /// Install watch sets for statement lifecycle logging.
769    ///
770    /// This installs both storage and compute watch sets that will fire
771    /// `StatementLifecycleEvent::StorageDependenciesFinished` and
772    /// `StatementLifecycleEvent::ComputeDependenciesFinished` respectively
773    /// when the dependencies are ready at the given timestamp.
774    pub(crate) fn install_peek_watch_sets(
775        &mut self,
776        conn_id: ConnectionId,
777        watch_set: WatchSetCreation,
778    ) -> Result<(), CollectionLookupError> {
779        let WatchSetCreation {
780            logging_id,
781            timestamp,
782            storage_ids,
783            compute_ids,
784        } = watch_set;
785
786        self.install_storage_watch_set(
787            conn_id.clone(),
788            storage_ids,
789            timestamp,
790            WatchSetResponse::StatementDependenciesReady(
791                logging_id,
792                StatementLifecycleEvent::StorageDependenciesFinished,
793            ),
794        )?;
795        self.install_compute_watch_set(
796            conn_id,
797            compute_ids,
798            timestamp,
799            WatchSetResponse::StatementDependenciesReady(
800                logging_id,
801                StatementLifecycleEvent::ComputeDependenciesFinished,
802            ),
803        )?;
804        Ok(())
805    }
806}