mz_adapter/coord/
appends.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic and types for all appends executed by the [`Coordinator`].
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::{Arc, LazyLock};
16use std::time::{Duration, Instant};
17
18use derivative::Derivative;
19use futures::future::{BoxFuture, FutureExt};
20use mz_adapter_types::connection::ConnectionId;
21use mz_catalog::builtin::{BuiltinTable, MZ_SESSIONS};
22use mz_expr::CollectionPlan;
23use mz_ore::metrics::MetricsFutureExt;
24use mz_ore::task;
25use mz_ore::tracing::OpenTelemetryContext;
26use mz_ore::{assert_none, instrument};
27use mz_repr::{CatalogItemId, Timestamp};
28use mz_sql::names::ResolvedIds;
29use mz_sql::plan::{ExplainPlanPlan, ExplainTimestampPlan, Explainee, ExplaineeStatement, Plan};
30use mz_sql::session::metadata::SessionMetadata;
31use mz_storage_client::client::TableData;
32use mz_timestamp_oracle::WriteTimestamp;
33use smallvec::SmallVec;
34use tokio::sync::{Notify, OwnedMutexGuard, OwnedSemaphorePermit, Semaphore, oneshot};
35use tracing::{Instrument, Span, debug_span, info, warn};
36
37use crate::catalog::{BuiltinTableUpdate, Catalog};
38use crate::coord::{Coordinator, Message, PendingTxn, PlanValidity};
39use crate::session::{GroupCommitWriteLocks, Session, WriteLocks};
40use crate::util::{CompletedClientTransmitter, ResultExt};
41use crate::{AdapterError, ExecuteContext};
42
43/// Tables that we emit updates for when starting a new session.
44pub(crate) static REQUIRED_BUILTIN_TABLES: &[&LazyLock<BuiltinTable>] = &[&MZ_SESSIONS];
45
46/// An operation that was deferred waiting on a resource to be available.
47///
48/// For example when inserting into a table we defer on acquiring [`WriteLocks`].
49#[derive(Debug)]
50pub enum DeferredOp {
51    /// A plan, e.g. ReadThenWrite, that needs locks before sequencing.
52    Plan(DeferredPlan),
53    /// Inserts into a collection.
54    Write(DeferredWrite),
55}
56
57impl DeferredOp {
58    /// Certain operations, e.g. "blind writes"/`INSERT` statements, can be optimistically retried
59    /// because we can share a write lock between multiple operations. In this case we wait to
60    /// acquire the locks until [`group_commit`], where writes are groupped by collection and
61    /// comitted at a single timestamp.
62    ///
63    /// Other operations, e.g. read-then-write plans/`UPDATE` statements, must uniquely hold their
64    /// write locks and thus we should acquire the locks in [`try_deferred`] to prevent multiple
65    /// queued plans attempting to get retried at the same time, when we know only one can proceed.
66    ///
67    /// [`try_deferred`]: crate::coord::Coordinator::try_deferred
68    /// [`group_commit`]: crate::coord::Coordinator::group_commit
69    pub(crate) fn can_be_optimistically_retried(&self) -> bool {
70        match self {
71            DeferredOp::Plan(_) => false,
72            DeferredOp::Write(_) => true,
73        }
74    }
75
76    /// Returns an Iterator of all the required locks for current operation.
77    pub fn required_locks(&self) -> impl Iterator<Item = CatalogItemId> + '_ {
78        match self {
79            DeferredOp::Plan(plan) => {
80                let iter = plan.requires_locks.iter().copied();
81                itertools::Either::Left(iter)
82            }
83            DeferredOp::Write(write) => {
84                let iter = write.writes.keys().copied();
85                itertools::Either::Right(iter)
86            }
87        }
88    }
89
90    /// Returns the [`ConnectionId`] associated with this deferred op.
91    pub fn conn_id(&self) -> &ConnectionId {
92        match self {
93            DeferredOp::Plan(plan) => plan.ctx.session().conn_id(),
94            DeferredOp::Write(write) => write.pending_txn.ctx.session().conn_id(),
95        }
96    }
97
98    /// Consumes the [`DeferredOp`], returning the inner [`ExecuteContext`].
99    pub fn into_ctx(self) -> ExecuteContext {
100        match self {
101            DeferredOp::Plan(plan) => plan.ctx,
102            DeferredOp::Write(write) => write.pending_txn.ctx,
103        }
104    }
105}
106
107/// Describes a plan that is awaiting [`WriteLocks`].
108#[derive(Derivative)]
109#[derivative(Debug)]
110pub struct DeferredPlan {
111    #[derivative(Debug = "ignore")]
112    pub ctx: ExecuteContext,
113    pub plan: Plan,
114    pub validity: PlanValidity,
115    pub requires_locks: BTreeSet<CatalogItemId>,
116}
117
118#[derive(Debug)]
119pub struct DeferredWrite {
120    pub span: Span,
121    pub writes: BTreeMap<CatalogItemId, SmallVec<[TableData; 1]>>,
122    pub pending_txn: PendingTxn,
123}
124
125/// Describes what action triggered an update to a builtin table.
126#[derive(Debug)]
127pub(crate) enum BuiltinTableUpdateSource {
128    /// Internal update, notify the caller when it's complete.
129    Internal(oneshot::Sender<()>),
130    /// Update was triggered by some background process, such as periodic heartbeats from COMPUTE.
131    Background(oneshot::Sender<()>),
132}
133
134/// A pending write transaction that will be committing during the next group commit.
135#[derive(Debug)]
136pub(crate) enum PendingWriteTxn {
137    /// Write to a user table.
138    User {
139        span: Span,
140        /// List of all write operations within the transaction.
141        writes: BTreeMap<CatalogItemId, SmallVec<[TableData; 1]>>,
142        /// If they exist, should contain locks for each [`CatalogItemId`] in `writes`.
143        write_locks: Option<WriteLocks>,
144        /// Inner transaction.
145        pending_txn: PendingTxn,
146    },
147    /// Write to a system table.
148    System {
149        updates: Vec<BuiltinTableUpdate>,
150        source: BuiltinTableUpdateSource,
151    },
152}
153
154impl PendingWriteTxn {
155    fn is_internal_system(&self) -> bool {
156        match self {
157            PendingWriteTxn::System {
158                source: BuiltinTableUpdateSource::Internal(_),
159                ..
160            } => true,
161            _ => false,
162        }
163    }
164}
165
166impl Coordinator {
167    /// Send a message to the Coordinate to start a group commit.
168    pub(crate) fn trigger_group_commit(&mut self) {
169        self.group_commit_tx.notify();
170        // Avoid excessive `Message::GroupCommitInitiate` by resetting the periodic table
171        // advancement. The group commit triggered by the message above will already advance all
172        // tables.
173        self.advance_timelines_interval.reset();
174    }
175
176    /// Tries to execute a previously [`DeferredOp`] that requires write locks.
177    ///
178    /// If we can't acquire all of the write locks then we'll defer the plan again and wait for
179    /// the necessary locks to become available.
180    pub(crate) async fn try_deferred(
181        &mut self,
182        conn_id: ConnectionId,
183        acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
184    ) {
185        // Try getting the deferred op, it may have already been canceled.
186        let Some(op) = self.deferred_write_ops.remove(&conn_id) else {
187            tracing::warn!(%conn_id, "no deferred op found, it must have been canceled?");
188            return;
189        };
190        tracing::info!(%conn_id, "trying deferred plan");
191
192        // If we pre-acquired a lock, try to acquire the rest.
193        let write_locks = match acquired_lock {
194            Some((acquired_gid, acquired_lock)) => {
195                let mut write_locks = WriteLocks::builder(op.required_locks());
196
197                // Insert the one lock we already acquired into the our builder.
198                write_locks.insert_lock(acquired_gid, acquired_lock);
199
200                // Acquire the rest of our locks, filtering out the one we already have.
201                for gid in op.required_locks().filter(|gid| *gid != acquired_gid) {
202                    if let Some(lock) = self.try_grant_object_write_lock(gid) {
203                        write_locks.insert_lock(gid, lock);
204                    }
205                }
206
207                // If we failed to acquire any locks, spawn a task that waits for them to become available.
208                let locks = match write_locks.all_or_nothing(op.conn_id()) {
209                    Ok(locks) => locks,
210                    Err(failed_to_acquire) => {
211                        let acquire_future = self
212                            .grant_object_write_lock(failed_to_acquire)
213                            .map(Option::Some);
214                        self.defer_op(acquire_future, op);
215                        return;
216                    }
217                };
218
219                Some(locks)
220            }
221            None => None,
222        };
223
224        match op {
225            DeferredOp::Plan(mut deferred) => {
226                if let Err(e) = deferred.validity.check(self.catalog()) {
227                    deferred.ctx.retire(Err(e))
228                } else {
229                    // Write statements never need to track resolved IDs (NOTE: This is not the
230                    // same thing as plan dependencies, which we do need to re-validate).
231                    let resolved_ids = ResolvedIds::empty();
232
233                    // If we pre-acquired our locks, grant them to the session.
234                    if let Some(locks) = write_locks {
235                        let conn_id = deferred.ctx.session().conn_id().clone();
236                        if let Err(existing) =
237                            deferred.ctx.session_mut().try_grant_write_locks(locks)
238                        {
239                            tracing::error!(
240                                %conn_id,
241                                ?existing,
242                                "session already write locks granted?",
243                            );
244                            return deferred.ctx.retire(Err(AdapterError::WrongSetOfLocks));
245                        }
246                    };
247
248                    // Note: This plan is not guaranteed to run, it may get deferred again.
249                    self.sequence_plan(deferred.ctx, deferred.plan, resolved_ids)
250                        .await;
251                }
252            }
253            DeferredOp::Write(DeferredWrite {
254                span,
255                writes,
256                pending_txn,
257            }) => {
258                self.submit_write(PendingWriteTxn::User {
259                    span,
260                    writes,
261                    write_locks,
262                    pending_txn,
263                });
264            }
265        }
266    }
267
268    /// Attempts to commit all pending write transactions in a group commit. If the timestamp
269    /// chosen for the writes is not ahead of `now()`, then we can execute and commit the writes
270    /// immediately. Otherwise we must wait for `now()` to advance past the timestamp chosen for the
271    /// writes.
272    #[instrument(level = "debug")]
273    pub(crate) async fn try_group_commit(&mut self, permit: Option<GroupCommitPermit>) {
274        let timestamp = self.peek_local_write_ts().await;
275        let now = Timestamp::from((self.catalog().config().now)());
276
277        // HACK: This is a special case to allow writes to the mz_sessions table to proceed even
278        // if the timestamp oracle is ahead of the current walltime. We do this because there are
279        // some tests that mock the walltime, so it doesn't automatically advance, and updating
280        // those tests to advance the walltime while creating a connection is too much.
281        //
282        // TODO(parkmycar): Get rid of the check below when refactoring group commits.
283        let contains_internal_system_write = self
284            .pending_writes
285            .iter()
286            .any(|write| write.is_internal_system());
287
288        if timestamp > now && !contains_internal_system_write {
289            // Cap retry time to 1s. In cases where the system clock has retreated by
290            // some large amount of time, this prevents against then waiting for that
291            // large amount of time in case the system clock then advances back to near
292            // what it was.
293            let remaining_ms = std::cmp::min(timestamp.saturating_sub(now), 1_000.into());
294            let internal_cmd_tx = self.internal_cmd_tx.clone();
295            task::spawn(
296                || "group_commit_initiate",
297                async move {
298                    tokio::time::sleep(Duration::from_millis(remaining_ms.into())).await;
299                    // It is not an error for this task to be running after `internal_cmd_rx` is dropped.
300                    let result =
301                        internal_cmd_tx.send(Message::GroupCommitInitiate(Span::current(), permit));
302                    if let Err(e) = result {
303                        warn!("internal_cmd_rx dropped before we could send: {:?}", e);
304                    }
305                }
306                .instrument(Span::current()),
307            );
308        } else {
309            self.group_commit(permit).await;
310        }
311    }
312
313    /// Tries to commit all pending writes transactions at the same timestamp.
314    ///
315    /// If the caller of this function has the `write_lock` acquired, then they can optionally pass
316    /// it in to this method. If the caller does not have the `write_lock` acquired and the
317    /// `write_lock` is currently locked by another operation, then only writes to system tables
318    /// and table advancements will be applied. If the caller does not have the `write_lock`
319    /// acquired and the `write_lock` is not currently locked by another operation, then group
320    /// commit will acquire it and all writes will be applied.
321    ///
322    /// All applicable pending writes will be combined into a single Append command and sent to
323    /// STORAGE as a single batch. All applicable writes will happen at the same timestamp and all
324    /// involved tables will be advanced to some timestamp larger than the timestamp of the write.
325    ///
326    /// Returns the timestamp of the write.
327    #[instrument(name = "coord::group_commit")]
328    pub(crate) async fn group_commit(&mut self, permit: Option<GroupCommitPermit>) -> Timestamp {
329        let mut validated_writes = Vec::new();
330        let mut deferred_writes = Vec::new();
331        let mut group_write_locks = GroupCommitWriteLocks::default();
332
333        // TODO(parkmycar): Refactor away this allocation. Currently `drain(..)` requires holding
334        // a mutable borrow on the Coordinator and so does trying to grant a write lock.
335        let pending_writes: Vec<_> = self.pending_writes.drain(..).collect();
336
337        // Validate, merge, and possibly acquire write locks for as many pending writes as possible.
338        for pending_write in pending_writes {
339            match pending_write {
340                // We always allow system writes to proceed.
341                PendingWriteTxn::System { .. } => validated_writes.push(pending_write),
342                // We have a set of locks! Validate they're correct (expected).
343                PendingWriteTxn::User {
344                    span,
345                    write_locks: Some(write_locks),
346                    writes,
347                    pending_txn,
348                } => match write_locks.validate(writes.keys().copied()) {
349                    Ok(validated_locks) => {
350                        // Merge all of our write locks together since we can allow concurrent
351                        // writes at the same timestamp.
352                        group_write_locks.merge(validated_locks);
353
354                        let validated_write = PendingWriteTxn::User {
355                            span,
356                            writes,
357                            write_locks: None,
358                            pending_txn,
359                        };
360                        validated_writes.push(validated_write);
361                    }
362                    // This is very unexpected since callers of this method should be validating.
363                    //
364                    // We cannot allow these write to occur since if the correct set of locks was
365                    // not taken we could violate serializability.
366                    Err(missing) => {
367                        let writes: Vec<_> = writes.keys().collect();
368                        panic!(
369                            "got to group commit with partial set of locks!\nmissing: {:?}, writes: {:?}, txn: {:?}",
370                            missing, writes, pending_txn,
371                        );
372                    }
373                },
374                // If we don't have any locks, try to acquire them, otherwise defer the write.
375                PendingWriteTxn::User {
376                    span,
377                    writes,
378                    write_locks: None,
379                    pending_txn,
380                } => {
381                    let missing = group_write_locks.missing_locks(writes.keys().copied());
382
383                    if missing.is_empty() {
384                        // We have all the locks! Queue the pending write.
385                        let validated_write = PendingWriteTxn::User {
386                            span,
387                            writes,
388                            write_locks: None,
389                            pending_txn,
390                        };
391                        validated_writes.push(validated_write);
392                    } else {
393                        // Try to acquire the locks we're missing.
394                        let mut just_in_time_locks = WriteLocks::builder(missing.clone());
395                        for collection in missing {
396                            if let Some(lock) = self.try_grant_object_write_lock(collection) {
397                                just_in_time_locks.insert_lock(collection, lock);
398                            }
399                        }
400
401                        match just_in_time_locks.all_or_nothing(pending_txn.ctx.session().conn_id())
402                        {
403                            // We acquired all of the locks! Proceed with the write.
404                            Ok(locks) => {
405                                group_write_locks.merge(locks);
406                                let validated_write = PendingWriteTxn::User {
407                                    span,
408                                    writes,
409                                    write_locks: None,
410                                    pending_txn,
411                                };
412                                validated_writes.push(validated_write);
413                            }
414                            // Darn. We couldn't acquire the locks, defer the write.
415                            Err(missing) => {
416                                let acquire_future =
417                                    self.grant_object_write_lock(missing).map(Option::Some);
418                                let write = DeferredWrite {
419                                    span,
420                                    writes,
421                                    pending_txn,
422                                };
423                                deferred_writes.push((acquire_future, write));
424                            }
425                        }
426                    }
427                }
428            }
429        }
430
431        // Queue all of our deferred ops.
432        for (acquire_future, write) in deferred_writes {
433            self.defer_op(acquire_future, DeferredOp::Write(write));
434        }
435
436        // The value returned here still might be ahead of `now()` if `now()` has gone backwards at
437        // any point during this method or if this was triggered from DDL. We will still commit the
438        // write without waiting for `now()` to advance. This is ok because the next batch of writes
439        // will trigger the wait loop in `try_group_commit()` if `now()` hasn't advanced past the
440        // global timeline, preventing an unbounded advancing of the global timeline ahead of
441        // `now()`. Additionally DDL is infrequent enough and takes long enough that we don't think
442        // it's practical for continuous DDL to advance the global timestamp in an unbounded manner.
443        let WriteTimestamp {
444            timestamp,
445            advance_to,
446        } = self.get_local_write_ts().await;
447
448        // While we're flipping on the feature flags for txn-wal tables and
449        // the separated Postgres timestamp oracle, we also need to confirm
450        // leadership on writes _after_ getting the timestamp and _before_
451        // writing anything to table shards.
452        //
453        // TODO: Remove this after both (either?) of the above features are on
454        // for good and no possibility of running the old code.
455        let confirm_leadership_start = Instant::now();
456        let () = self
457            .catalog
458            .confirm_leadership()
459            .await
460            .unwrap_or_terminate("unable to confirm leadership");
461        self.metrics
462            .group_commit_confirm_leadership_seconds
463            .observe(confirm_leadership_start.elapsed().as_secs_f64());
464
465        let mut appends: BTreeMap<CatalogItemId, SmallVec<[TableData; 1]>> = BTreeMap::new();
466        let mut responses = Vec::with_capacity(validated_writes.len());
467        let mut notifies = Vec::new();
468
469        for validated_write_txn in validated_writes {
470            match validated_write_txn {
471                PendingWriteTxn::User {
472                    span: _,
473                    writes,
474                    write_locks,
475                    pending_txn:
476                        PendingTxn {
477                            ctx,
478                            response,
479                            action,
480                        },
481                } => {
482                    assert_none!(write_locks, "should have merged together all locks above");
483                    for (id, table_data) in writes {
484                        // If the table that some write was targeting has been deleted while the
485                        // write was waiting, then the write will be ignored and we respond to the
486                        // client that the write was successful. This is only possible if the write
487                        // and the delete were concurrent. Therefore, we are free to order the
488                        // write before the delete without violating any consistency guarantees.
489                        if self.catalog().try_get_entry(&id).is_some() {
490                            appends.entry(id).or_default().extend(table_data);
491                        }
492                    }
493                    if let Some(id) = ctx.extra().contents() {
494                        self.set_statement_execution_timestamp(id, timestamp);
495                    }
496
497                    responses.push(CompletedClientTransmitter::new(ctx, response, action));
498                }
499                PendingWriteTxn::System { updates, source } => {
500                    for update in updates {
501                        appends.entry(update.id).or_default().push(update.data);
502                    }
503                    // Once the write completes we notify any waiters.
504                    match source {
505                        BuiltinTableUpdateSource::Internal(tx)
506                        | BuiltinTableUpdateSource::Background(tx) => notifies.push(tx),
507                    }
508                }
509            }
510        }
511
512        // Add table advancements for all tables.
513        let table_advancement_start = Instant::now();
514        for table in self.catalog().entries().filter(|entry| entry.is_table()) {
515            appends.entry(table.id()).or_default();
516        }
517        self.metrics
518            .group_commit_table_advancement_seconds
519            .observe(table_advancement_start.elapsed().as_secs_f64());
520
521        // Consolidate all Rows for a given table. We do not consolidate the
522        // staged batches, that's up to whoever staged them.
523        let mut all_appends = Vec::with_capacity(appends.len());
524        for (item_id, table_data) in appends.into_iter() {
525            let mut all_rows = Vec::new();
526            let mut all_data = Vec::new();
527            for data in table_data {
528                match data {
529                    TableData::Rows(rows) => all_rows.extend(rows),
530                    TableData::Batches(_) => all_data.push(data),
531                }
532            }
533            differential_dataflow::consolidation::consolidate(&mut all_rows);
534            all_data.push(TableData::Rows(all_rows));
535
536            // TODO(parkmycar): Use SmallVec throughout.
537            all_appends.push((item_id, all_data));
538        }
539
540        let appends: Vec<_> = all_appends
541            .into_iter()
542            .map(|(id, updates)| {
543                let gid = self.catalog().get_entry(&id).latest_global_id();
544                (gid, updates)
545            })
546            .collect();
547
548        // Log non-empty user appends.
549        let modified_tables: Vec<_> = appends
550            .iter()
551            .filter_map(|(id, updates)| {
552                if id.is_user() && !updates.iter().all(|u| u.is_empty()) {
553                    Some(id)
554                } else {
555                    None
556                }
557            })
558            .collect();
559        if !modified_tables.is_empty() {
560            info!(
561                "Appending to tables, {modified_tables:?}, at {timestamp}, advancing to {advance_to}"
562            );
563        }
564        // Instrument our table writes since they can block the coordinator.
565        let histogram = self.metrics.append_table_duration_seconds.clone();
566        let append_fut = self
567            .controller
568            .storage
569            .append_table(timestamp, advance_to, appends)
570            .expect("invalid updates")
571            .wall_time()
572            .observe(histogram);
573
574        // Spawn a task to do the table writes.
575        let internal_cmd_tx = self.internal_cmd_tx.clone();
576        let apply_write_fut = self.apply_local_write(timestamp);
577
578        let span = debug_span!(parent: None, "group_commit_apply");
579        OpenTelemetryContext::obtain().attach_as_parent_to(&span);
580        task::spawn(
581            || "group_commit_apply",
582            async move {
583                // Wait for the writes to complete.
584                match append_fut
585                    .instrument(debug_span!("group_commit_apply::append_fut"))
586                    .await
587                {
588                    Ok(append_result) => {
589                        append_result.unwrap_or_terminate("cannot fail to apply appends")
590                    }
591                    Err(_) => warn!("Writer terminated with writes in indefinite state"),
592                };
593
594                // Apply the write by marking the timestamp as complete on the timeline.
595                apply_write_fut
596                    .instrument(debug_span!("group_commit_apply::append_write_fut"))
597                    .await;
598
599                // Notify the external clients of the result.
600                for response in responses {
601                    let (mut ctx, result) = response.finalize();
602                    ctx.session_mut().apply_write(timestamp);
603                    ctx.retire(result);
604                }
605
606                // IMPORTANT: Make sure we hold the permit and write locks
607                // until here, to prevent other writes from going through while
608                // we haven't yet applied the write at the timestamp oracle.
609                drop(permit);
610                drop(group_write_locks);
611
612                // Advance other timelines.
613                if let Err(e) = internal_cmd_tx.send(Message::AdvanceTimelines) {
614                    warn!("Server closed with non-advanced timelines, {e}");
615                }
616
617                for notify in notifies {
618                    // We don't care if the listeners have gone away.
619                    let _ = notify.send(());
620                }
621            }
622            .instrument(span),
623        );
624
625        timestamp
626    }
627
628    /// Submit a write to be executed during the next group commit and trigger a group commit.
629    pub(crate) fn submit_write(&mut self, pending_write_txn: PendingWriteTxn) {
630        if self.controller.read_only() {
631            panic!(
632                "attempting table write in read-only mode: {:?}",
633                pending_write_txn
634            );
635        }
636        self.pending_writes.push(pending_write_txn);
637        self.trigger_group_commit();
638    }
639
640    /// Append some [`BuiltinTableUpdate`]s, with various degrees of waiting and blocking.
641    pub(crate) fn builtin_table_update<'a>(&'a mut self) -> BuiltinTableAppend<'a> {
642        BuiltinTableAppend { coord: self }
643    }
644
645    pub(crate) fn defer_op<F>(&mut self, acquire_future: F, op: DeferredOp)
646    where
647        F: Future<Output = Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>>
648            + Send
649            + 'static,
650    {
651        let conn_id = op.conn_id().clone();
652
653        // Track all of our deferred ops.
654        let is_optimistic = op.can_be_optimistically_retried();
655        self.deferred_write_ops.insert(conn_id.clone(), op);
656
657        let internal_cmd_tx = self.internal_cmd_tx.clone();
658        let conn_id_ = conn_id.clone();
659        mz_ore::task::spawn(|| format!("defer op {conn_id_}"), async move {
660            tracing::info!(%conn_id, "deferring plan");
661            // Once we can acquire the first failed lock, try running the deferred plan.
662            //
663            // Note: This does not guarantee the plan will be able to run, there might be
664            // other locks that we later fail to get.
665            let acquired_lock = acquire_future.await;
666
667            // Some operations, e.g. blind INSERTs, can be optimistically retried, meaning we
668            // can run multiple at once. In those cases we don't hold the lock so we retry all
669            // blind writes for a single object.
670            let acquired_lock = match (acquired_lock, is_optimistic) {
671                (Some(_lock), true) => None,
672                (Some(lock), false) => Some(lock),
673                (None, _) => None,
674            };
675
676            // If this send fails then the Coordinator is shutting down.
677            let _ = internal_cmd_tx.send(Message::TryDeferred {
678                conn_id,
679                acquired_lock,
680            });
681        });
682    }
683
684    /// Returns a future that waits until it can get an exclusive lock on the specified collection.
685    pub(crate) fn grant_object_write_lock(
686        &mut self,
687        object_id: CatalogItemId,
688    ) -> impl Future<Output = (CatalogItemId, OwnedMutexGuard<()>)> + 'static {
689        let write_lock_handle = self
690            .write_locks
691            .entry(object_id)
692            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())));
693        let write_lock_handle = Arc::clone(write_lock_handle);
694
695        write_lock_handle
696            .lock_owned()
697            .map(move |guard| (object_id, guard))
698    }
699
700    /// Lazily creates the lock for the provided `object_id`, and grants it if possible, returns
701    /// `None` if the lock is already held.
702    pub(crate) fn try_grant_object_write_lock(
703        &mut self,
704        object_id: CatalogItemId,
705    ) -> Option<OwnedMutexGuard<()>> {
706        let write_lock_handle = self
707            .write_locks
708            .entry(object_id)
709            .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())));
710        let write_lock_handle = Arc::clone(write_lock_handle);
711
712        write_lock_handle.try_lock_owned().ok()
713    }
714}
715
716/// Helper struct to run a builtin table append.
717pub struct BuiltinTableAppend<'a> {
718    coord: &'a mut Coordinator,
719}
720
721/// `Future` that notifies when a builtin table write has completed.
722///
723/// Note: builtin table writes need to talk to persist, which can take 100s of milliseconds. This
724/// type allows you to execute a builtin table write, e.g. via [`BuiltinTableAppend::execute`], and
725/// wait for it to complete, while other long running tasks are concurrently executing.
726pub type BuiltinTableAppendNotify = Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>;
727
728impl<'a> BuiltinTableAppend<'a> {
729    /// Submit a write to a system table to be executed during the next group commit. This method
730    /// __does not__ trigger a group commit.
731    ///
732    /// This is useful for non-critical writes like metric updates because it allows us to piggy
733    /// back off the next group commit instead of triggering a potentially expensive group commit.
734    ///
735    /// Note: __do not__ call this for DDL which needs the system tables updated immediately.
736    ///
737    /// Note: When in read-only mode, this will buffer the update and return
738    /// immediately.
739    pub fn background(self, mut updates: Vec<BuiltinTableUpdate>) -> BuiltinTableAppendNotify {
740        if self.coord.controller.read_only() {
741            self.coord
742                .buffered_builtin_table_updates
743                .as_mut()
744                .expect("in read-only mode")
745                .append(&mut updates);
746
747            return Box::pin(futures::future::ready(()));
748        }
749
750        let (tx, rx) = oneshot::channel();
751        self.coord.pending_writes.push(PendingWriteTxn::System {
752            updates,
753            source: BuiltinTableUpdateSource::Background(tx),
754        });
755
756        Box::pin(rx.map(|_| ()))
757    }
758
759    /// Submits a write to be executed during the next group commit __and__ triggers a group commit.
760    ///
761    /// Returns a `Future` that resolves when the write has completed, does not block the
762    /// Coordinator.
763    ///
764    /// Note: When in read-only mode, this will buffer the update and the
765    /// returned future will resolve immediately, without the update actually
766    /// having been written.
767    pub fn defer(self, mut updates: Vec<BuiltinTableUpdate>) -> BuiltinTableAppendNotify {
768        if self.coord.controller.read_only() {
769            self.coord
770                .buffered_builtin_table_updates
771                .as_mut()
772                .expect("in read-only mode")
773                .append(&mut updates);
774
775            return Box::pin(futures::future::ready(()));
776        }
777
778        let (tx, rx) = oneshot::channel();
779        self.coord.pending_writes.push(PendingWriteTxn::System {
780            updates,
781            source: BuiltinTableUpdateSource::Internal(tx),
782        });
783        self.coord.trigger_group_commit();
784
785        Box::pin(rx.map(|_| ()))
786    }
787
788    /// Submit a write to a system table.
789    ///
790    /// This method will block the Coordinator on acquiring a write timestamp from the timestamp
791    /// oracle, and then returns a `Future` that will complete once the write has been applied and
792    /// the write timestamp.
793    ///
794    /// Note: When in read-only mode, this will buffer the update, the
795    /// returned future will resolve immediately, without the update actually
796    /// having been written, and no timestamp is returned.
797    pub async fn execute(
798        self,
799        mut updates: Vec<BuiltinTableUpdate>,
800    ) -> (BuiltinTableAppendNotify, Option<Timestamp>) {
801        if self.coord.controller.read_only() {
802            self.coord
803                .buffered_builtin_table_updates
804                .as_mut()
805                .expect("in read-only mode")
806                .append(&mut updates);
807
808            return (Box::pin(futures::future::ready(())), None);
809        }
810
811        let (tx, rx) = oneshot::channel();
812
813        // Most DDL queries cause writes to system tables. Unlike writes to user tables, system
814        // table writes do not wait for a group commit, they explicitly trigger one. There is a
815        // possibility that if a user is executing DDL at a rate faster than 1 query per
816        // millisecond, then the global timeline will unboundedly advance past the system clock.
817        // This can cause future queries to block, but will not affect correctness. Since this
818        // rate of DDL is unlikely, we allow DDL to explicitly trigger group commit.
819        self.coord.pending_writes.push(PendingWriteTxn::System {
820            updates,
821            source: BuiltinTableUpdateSource::Internal(tx),
822        });
823        let write_ts = self.coord.group_commit(None).await;
824
825        // Avoid excessive group commits by resetting the periodic table advancement timer. The
826        // group commit triggered by above will already advance all tables.
827        self.coord.advance_timelines_interval.reset();
828
829        (Box::pin(rx.map(|_| ())), Some(write_ts))
830    }
831
832    /// Submit a write to a system table, blocking until complete.
833    ///
834    /// Note: if possible you should use the `execute(...)` method, which returns a `Future` that
835    /// can be `await`-ed concurrently with other tasks.
836    ///
837    /// Note: When in read-only mode, this will buffer the update and the
838    /// returned future will resolve immediately, without the update actually
839    /// having been written.
840    pub async fn blocking(self, updates: Vec<BuiltinTableUpdate>) {
841        let (notify, _) = self.execute(updates).await;
842        notify.await;
843    }
844}
845
846/// Returns two sides of a "channel" that can be used to notify the coordinator when we want a
847/// group commit to be run.
848pub fn notifier() -> (GroupCommitNotifier, GroupCommitWaiter) {
849    let notify = Arc::new(Notify::new());
850    let in_progress = Arc::new(Semaphore::new(1));
851
852    let notifier = GroupCommitNotifier {
853        notify: Arc::clone(&notify),
854    };
855    let waiter = GroupCommitWaiter {
856        notify,
857        in_progress,
858    };
859
860    (notifier, waiter)
861}
862
863/// A handle that allows us to notify the coordinator that a group commit should be run at some
864/// point in the future.
865#[derive(Debug, Clone)]
866pub struct GroupCommitNotifier {
867    /// Tracks if there are any outstanding group commits.
868    notify: Arc<Notify>,
869}
870
871impl GroupCommitNotifier {
872    /// Notifies the [`GroupCommitWaiter`] that we'd like a group commit to be run.
873    pub fn notify(&self) {
874        self.notify.notify_one()
875    }
876}
877
878/// A handle that returns a future when a group commit needs to be run, and one is not currently
879/// being run.
880#[derive(Debug)]
881pub struct GroupCommitWaiter {
882    /// Tracks if there are any outstanding group commits.
883    notify: Arc<Notify>,
884    /// Distributes permits which tracks in progress group commits.
885    in_progress: Arc<Semaphore>,
886}
887static_assertions::assert_not_impl_all!(GroupCommitWaiter: Clone);
888
889impl GroupCommitWaiter {
890    /// Returns a permit for a group commit, once a permit is available _and_ there someone
891    /// requested a group commit to be run.
892    ///
893    /// # Cancel Safety
894    ///
895    /// * Waiting on the returned Future is cancel safe because we acquire an in-progress permit
896    ///   before waiting for notifications. If the Future gets dropped after acquiring a permit but
897    ///   before a group commit is queued, we'll release the permit which can be acquired by the
898    ///   next caller.
899    ///
900    pub async fn ready(&self) -> GroupCommitPermit {
901        let permit = Semaphore::acquire_owned(Arc::clone(&self.in_progress))
902            .await
903            .expect("semaphore should not close");
904
905        // Note: We must wait for notifies _after_ waiting for a permit to be acquired for cancel
906        // safety.
907        self.notify.notified().await;
908
909        GroupCommitPermit {
910            _permit: Some(permit),
911        }
912    }
913}
914
915/// A permit to run a group commit, this must be kept alive for the entire duration of the commit.
916///
917/// Note: We sometimes want to throttle how many group commits are running at once, which this
918/// permit allows us to do.
919#[derive(Debug)]
920pub struct GroupCommitPermit {
921    /// Permit that is preventing other group commits from running.
922    ///
923    /// Only `None` if the permit has been moved into a tokio task for waiting.
924    _permit: Option<OwnedSemaphorePermit>,
925}
926
927/// When we start a [`Session`] we need to update some builtin tables, but we don't want to wait for
928/// these writes to complete for two reasons:
929///
930/// 1. Doing a write can take a relatively long time.
931/// 2. Decoupling the write from the session start allows us to batch multiple writes together, if
932///    sessions are being created with a high frequency.
933///
934/// So, as an optimization we do not wait for these writes to complete. But if a [`Session`] tries
935/// to query any of these builtin objects, we need to block that query on the writes completing to
936/// maintain linearizability.
937///
938/// Warning: this already clears the wait flag (i.e., it calls `clear_builtin_table_updates`).
939///
940/// TODO(peek-seq): After we delete the old peek sequencing, we can remove the first component of
941/// the return tuple.
942pub(crate) fn waiting_on_startup_appends(
943    catalog: &Catalog,
944    session: &mut Session,
945    plan: &Plan,
946) -> Option<(BTreeSet<CatalogItemId>, BoxFuture<'static, ()>)> {
947    // TODO(parkmycar): We need to check transitive uses here too if we ever move the
948    // referenced builtin tables out of mz_internal, or we allow creating views on
949    // mz_internal objects.
950    let depends_on = match plan {
951        Plan::Select(plan) => plan.source.depends_on(),
952        Plan::ReadThenWrite(plan) => plan.selection.depends_on(),
953        Plan::ShowColumns(plan) => plan.select_plan.source.depends_on(),
954        Plan::Subscribe(plan) => plan.from.depends_on(),
955        Plan::ExplainPlan(ExplainPlanPlan {
956            explainee: Explainee::Statement(ExplaineeStatement::Select { plan, .. }),
957            ..
958        }) => plan.source.depends_on(),
959        Plan::ExplainTimestamp(ExplainTimestampPlan { raw_plan, .. }) => raw_plan.depends_on(),
960        Plan::CreateConnection(_)
961        | Plan::CreateDatabase(_)
962        | Plan::CreateSchema(_)
963        | Plan::CreateRole(_)
964        | Plan::CreateNetworkPolicy(_)
965        | Plan::CreateCluster(_)
966        | Plan::CreateClusterReplica(_)
967        | Plan::CreateContinualTask(_)
968        | Plan::CreateSource(_)
969        | Plan::CreateSources(_)
970        | Plan::CreateSecret(_)
971        | Plan::CreateSink(_)
972        | Plan::CreateTable(_)
973        | Plan::CreateView(_)
974        | Plan::CreateMaterializedView(_)
975        | Plan::CreateIndex(_)
976        | Plan::CreateType(_)
977        | Plan::Comment(_)
978        | Plan::DiscardTemp
979        | Plan::DiscardAll
980        | Plan::DropObjects(_)
981        | Plan::DropOwned(_)
982        | Plan::EmptyQuery
983        | Plan::ShowAllVariables
984        | Plan::ShowCreate(_)
985        | Plan::ShowVariable(_)
986        | Plan::InspectShard(_)
987        | Plan::SetVariable(_)
988        | Plan::ResetVariable(_)
989        | Plan::SetTransaction(_)
990        | Plan::StartTransaction(_)
991        | Plan::CommitTransaction(_)
992        | Plan::AbortTransaction(_)
993        | Plan::CopyFrom(_)
994        | Plan::CopyTo(_)
995        | Plan::ExplainPlan(_)
996        | Plan::ExplainPushdown(_)
997        | Plan::ExplainSinkSchema(_)
998        | Plan::Insert(_)
999        | Plan::AlterNetworkPolicy(_)
1000        | Plan::AlterNoop(_)
1001        | Plan::AlterClusterRename(_)
1002        | Plan::AlterClusterSwap(_)
1003        | Plan::AlterClusterReplicaRename(_)
1004        | Plan::AlterCluster(_)
1005        | Plan::AlterConnection(_)
1006        | Plan::AlterSource(_)
1007        | Plan::AlterSetCluster(_)
1008        | Plan::AlterItemRename(_)
1009        | Plan::AlterRetainHistory(_)
1010        | Plan::AlterSchemaRename(_)
1011        | Plan::AlterSchemaSwap(_)
1012        | Plan::AlterSecret(_)
1013        | Plan::AlterSink(_)
1014        | Plan::AlterSystemSet(_)
1015        | Plan::AlterSystemReset(_)
1016        | Plan::AlterSystemResetAll(_)
1017        | Plan::AlterRole(_)
1018        | Plan::AlterOwner(_)
1019        | Plan::AlterTableAddColumn(_)
1020        | Plan::AlterMaterializedViewApplyReplacement(_)
1021        | Plan::Declare(_)
1022        | Plan::Fetch(_)
1023        | Plan::Close(_)
1024        | Plan::Prepare(_)
1025        | Plan::Execute(_)
1026        | Plan::Deallocate(_)
1027        | Plan::Raise(_)
1028        | Plan::GrantRole(_)
1029        | Plan::RevokeRole(_)
1030        | Plan::GrantPrivileges(_)
1031        | Plan::RevokePrivileges(_)
1032        | Plan::AlterDefaultPrivileges(_)
1033        | Plan::ReassignOwned(_)
1034        | Plan::ValidateConnection(_)
1035        | Plan::SideEffectingFunc(_) => BTreeSet::default(),
1036    };
1037    let depends_on_required_id = REQUIRED_BUILTIN_TABLES
1038        .iter()
1039        .map(|table| catalog.resolve_builtin_table(&**table))
1040        .any(|id| {
1041            catalog
1042                .get_global_ids(&id)
1043                .any(|gid| depends_on.contains(&gid))
1044        });
1045
1046    // If our plan does not depend on any required ID, then we don't need to
1047    // wait for any builtin writes to occur.
1048    if !depends_on_required_id {
1049        return None;
1050    }
1051
1052    // Even if we depend on a builtin table, there's no need to wait if the
1053    // writes have already completed.
1054    //
1055    // TODO(parkmycar): As an optimization we should add a `Notify` type to
1056    // `mz_ore` that allows peeking. If the builtin table writes have already
1057    // completed then there is no need to defer this plan.
1058    match session.clear_builtin_table_updates() {
1059        Some(wait_future) => {
1060            let depends_on = depends_on
1061                .into_iter()
1062                .map(|gid| catalog.get_entry_by_global_id(&gid).id())
1063                .collect();
1064            Some((depends_on, wait_future.boxed()))
1065        }
1066        None => None,
1067    }
1068}