mz_adapter/coord/appends.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Logic and types for all appends executed by the [`Coordinator`].
11
12use std::collections::{BTreeMap, BTreeSet};
13use std::future::Future;
14use std::pin::Pin;
15use std::sync::{Arc, LazyLock};
16use std::time::{Duration, Instant};
17
18use derivative::Derivative;
19use futures::future::{BoxFuture, FutureExt};
20use mz_adapter_types::connection::ConnectionId;
21use mz_catalog::builtin::{BuiltinTable, MZ_SESSIONS};
22use mz_expr::CollectionPlan;
23use mz_ore::metrics::MetricsFutureExt;
24use mz_ore::task;
25use mz_ore::tracing::OpenTelemetryContext;
26use mz_ore::{assert_none, instrument};
27use mz_repr::{CatalogItemId, Timestamp};
28use mz_sql::names::ResolvedIds;
29use mz_sql::plan::{ExplainPlanPlan, ExplainTimestampPlan, Explainee, ExplaineeStatement, Plan};
30use mz_sql::session::metadata::SessionMetadata;
31use mz_storage_client::client::TableData;
32use mz_timestamp_oracle::WriteTimestamp;
33use smallvec::SmallVec;
34use tokio::sync::{Notify, OwnedMutexGuard, OwnedSemaphorePermit, Semaphore, oneshot};
35use tracing::{Instrument, Span, debug_span, info, warn};
36
37use crate::catalog::{BuiltinTableUpdate, Catalog};
38use crate::coord::{Coordinator, Message, PendingTxn, PlanValidity};
39use crate::session::{GroupCommitWriteLocks, Session, WriteLocks};
40use crate::util::{CompletedClientTransmitter, ResultExt};
41use crate::{AdapterError, ExecuteContext};
42
43/// Tables that we emit updates for when starting a new session.
44pub(crate) static REQUIRED_BUILTIN_TABLES: &[&LazyLock<BuiltinTable>] = &[&MZ_SESSIONS];
45
46/// An operation that was deferred waiting on a resource to be available.
47///
48/// For example when inserting into a table we defer on acquiring [`WriteLocks`].
49#[derive(Debug)]
50pub enum DeferredOp {
51 /// A plan, e.g. ReadThenWrite, that needs locks before sequencing.
52 Plan(DeferredPlan),
53 /// Inserts into a collection.
54 Write(DeferredWrite),
55}
56
57impl DeferredOp {
58 /// Certain operations, e.g. "blind writes"/`INSERT` statements, can be optimistically retried
59 /// because we can share a write lock between multiple operations. In this case we wait to
60 /// acquire the locks until [`group_commit`], where writes are groupped by collection and
61 /// comitted at a single timestamp.
62 ///
63 /// Other operations, e.g. read-then-write plans/`UPDATE` statements, must uniquely hold their
64 /// write locks and thus we should acquire the locks in [`try_deferred`] to prevent multiple
65 /// queued plans attempting to get retried at the same time, when we know only one can proceed.
66 ///
67 /// [`try_deferred`]: crate::coord::Coordinator::try_deferred
68 /// [`group_commit`]: crate::coord::Coordinator::group_commit
69 pub(crate) fn can_be_optimistically_retried(&self) -> bool {
70 match self {
71 DeferredOp::Plan(_) => false,
72 DeferredOp::Write(_) => true,
73 }
74 }
75
76 /// Returns an Iterator of all the required locks for current operation.
77 pub fn required_locks(&self) -> impl Iterator<Item = CatalogItemId> + '_ {
78 match self {
79 DeferredOp::Plan(plan) => {
80 let iter = plan.requires_locks.iter().copied();
81 itertools::Either::Left(iter)
82 }
83 DeferredOp::Write(write) => {
84 let iter = write.writes.keys().copied();
85 itertools::Either::Right(iter)
86 }
87 }
88 }
89
90 /// Returns the [`ConnectionId`] associated with this deferred op.
91 pub fn conn_id(&self) -> &ConnectionId {
92 match self {
93 DeferredOp::Plan(plan) => plan.ctx.session().conn_id(),
94 DeferredOp::Write(write) => write.pending_txn.ctx.session().conn_id(),
95 }
96 }
97
98 /// Consumes the [`DeferredOp`], returning the inner [`ExecuteContext`].
99 pub fn into_ctx(self) -> ExecuteContext {
100 match self {
101 DeferredOp::Plan(plan) => plan.ctx,
102 DeferredOp::Write(write) => write.pending_txn.ctx,
103 }
104 }
105}
106
107/// Describes a plan that is awaiting [`WriteLocks`].
108#[derive(Derivative)]
109#[derivative(Debug)]
110pub struct DeferredPlan {
111 #[derivative(Debug = "ignore")]
112 pub ctx: ExecuteContext,
113 pub plan: Plan,
114 pub validity: PlanValidity,
115 pub requires_locks: BTreeSet<CatalogItemId>,
116}
117
118#[derive(Debug)]
119pub struct DeferredWrite {
120 pub span: Span,
121 pub writes: BTreeMap<CatalogItemId, SmallVec<[TableData; 1]>>,
122 pub pending_txn: PendingTxn,
123}
124
125/// Describes what action triggered an update to a builtin table.
126#[derive(Debug)]
127pub(crate) enum BuiltinTableUpdateSource {
128 /// Internal update, notify the caller when it's complete.
129 Internal(oneshot::Sender<()>),
130 /// Update was triggered by some background process, such as periodic heartbeats from COMPUTE.
131 Background(oneshot::Sender<()>),
132}
133
134/// Where to deliver the result of a [`PendingWriteTxn::User`] write.
135#[derive(Debug)]
136pub(crate) enum UserWriteResponder {
137 /// Session-bound write. The coordinator retires the session's
138 /// `ExecuteContext` once the write commits.
139 Session(PendingTxn),
140}
141
142/// A pending write transaction that will be committing during the next group commit.
143#[derive(Debug)]
144pub(crate) enum PendingWriteTxn {
145 /// Write to a user table. The write timestamp is picked by the oracle
146 /// during group commit. The write lock is either handed off from the
147 /// submitting session (via `write_locks: Some(..)`) or acquired during
148 /// group commit (`write_locks: None`).
149 User {
150 span: Span,
151 /// List of all write operations within the transaction.
152 writes: BTreeMap<CatalogItemId, SmallVec<[TableData; 1]>>,
153 /// If they exist, should contain locks for each [`CatalogItemId`] in `writes`.
154 write_locks: Option<WriteLocks>,
155 /// Where to deliver the result once the write commits.
156 responder: UserWriteResponder,
157 },
158 /// Write to a system table.
159 System {
160 updates: Vec<BuiltinTableUpdate>,
161 source: BuiltinTableUpdateSource,
162 },
163}
164
165impl PendingWriteTxn {
166 fn is_internal_system(&self) -> bool {
167 match self {
168 PendingWriteTxn::System {
169 source: BuiltinTableUpdateSource::Internal(_),
170 ..
171 } => true,
172 _ => false,
173 }
174 }
175}
176
177impl Coordinator {
178 /// Send a message to the Coordinate to start a group commit.
179 pub(crate) fn trigger_group_commit(&mut self) {
180 self.group_commit_tx.notify();
181 // Avoid excessive `Message::GroupCommitInitiate` by resetting the periodic table
182 // advancement. The group commit triggered by the message above will already advance all
183 // tables.
184 self.advance_timelines_interval.reset();
185 }
186
187 /// Tries to execute a previously [`DeferredOp`] that requires write locks.
188 ///
189 /// If we can't acquire all of the write locks then we'll defer the plan again and wait for
190 /// the necessary locks to become available.
191 pub(crate) async fn try_deferred(
192 &mut self,
193 conn_id: ConnectionId,
194 acquired_lock: Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>,
195 ) {
196 // Try getting the deferred op, it may have already been canceled.
197 let Some(op) = self.deferred_write_ops.remove(&conn_id) else {
198 tracing::warn!(%conn_id, "no deferred op found, it must have been canceled?");
199 return;
200 };
201 tracing::info!(%conn_id, "trying deferred plan");
202
203 // If we pre-acquired a lock, try to acquire the rest.
204 let write_locks = match acquired_lock {
205 Some((acquired_gid, acquired_lock)) => {
206 let mut write_locks = WriteLocks::builder(op.required_locks());
207
208 // Insert the one lock we already acquired into the our builder.
209 write_locks.insert_lock(acquired_gid, acquired_lock);
210
211 // Acquire the rest of our locks, filtering out the one we already have.
212 for gid in op.required_locks().filter(|gid| *gid != acquired_gid) {
213 if let Some(lock) = self.try_grant_object_write_lock(gid) {
214 write_locks.insert_lock(gid, lock);
215 }
216 }
217
218 // If we failed to acquire any locks, spawn a task that waits for them to become available.
219 let locks = match write_locks.all_or_nothing(op.conn_id()) {
220 Ok(locks) => locks,
221 Err(failed_to_acquire) => {
222 let acquire_future = self
223 .grant_object_write_lock(failed_to_acquire)
224 .map(Option::Some);
225 self.defer_op(acquire_future, op);
226 return;
227 }
228 };
229
230 Some(locks)
231 }
232 None => None,
233 };
234
235 match op {
236 DeferredOp::Plan(mut deferred) => {
237 if let Err(e) = deferred.validity.check(self.catalog()) {
238 deferred.ctx.retire(Err(e))
239 } else {
240 // Write statements never need to track resolved IDs (NOTE: This is not the
241 // same thing as plan dependencies, which we do need to re-validate).
242 let resolved_ids = ResolvedIds::empty();
243
244 // If we pre-acquired our locks, grant them to the session.
245 if let Some(locks) = write_locks {
246 let conn_id = deferred.ctx.session().conn_id().clone();
247 if let Err(existing) =
248 deferred.ctx.session_mut().try_grant_write_locks(locks)
249 {
250 tracing::error!(
251 %conn_id,
252 ?existing,
253 "session already write locks granted?",
254 );
255 return deferred.ctx.retire(Err(AdapterError::WrongSetOfLocks));
256 }
257 };
258
259 // Note: This plan is not guaranteed to run, it may get deferred again.
260 self.sequence_plan(
261 deferred.ctx,
262 deferred.plan,
263 resolved_ids,
264 ResolvedIds::empty(),
265 )
266 .await;
267 }
268 }
269 DeferredOp::Write(DeferredWrite {
270 span,
271 writes,
272 pending_txn,
273 }) => {
274 self.submit_write(PendingWriteTxn::User {
275 span,
276 writes,
277 write_locks,
278 responder: UserWriteResponder::Session(pending_txn),
279 });
280 }
281 }
282 }
283
284 /// Attempts to commit all pending write transactions in a group commit. If the timestamp
285 /// chosen for the writes is not ahead of `now()`, then we can execute and commit the writes
286 /// immediately. Otherwise we must wait for `now()` to advance past the timestamp chosen for the
287 /// writes.
288 #[instrument(level = "debug")]
289 pub(crate) async fn try_group_commit(&mut self, permit: Option<GroupCommitPermit>) {
290 let timestamp = self.peek_local_write_ts().await;
291 let now = Timestamp::from((self.catalog().config().now)());
292
293 // HACK: This is a special case to allow writes to the mz_sessions table to proceed even
294 // if the timestamp oracle is ahead of the current walltime. We do this because there are
295 // some tests that mock the walltime, so it doesn't automatically advance, and updating
296 // those tests to advance the walltime while creating a connection is too much.
297 //
298 // TODO(parkmycar): Get rid of the check below when refactoring group commits.
299 let contains_internal_system_write = self
300 .pending_writes
301 .iter()
302 .any(|write| write.is_internal_system());
303
304 if timestamp > now && !contains_internal_system_write {
305 // Cap retry time to 1s. In cases where the system clock has retreated by
306 // some large amount of time, this prevents against then waiting for that
307 // large amount of time in case the system clock then advances back to near
308 // what it was.
309 let remaining_ms = std::cmp::min(timestamp.saturating_sub(now), 1_000.into());
310 let internal_cmd_tx = self.internal_cmd_tx.clone();
311 task::spawn(
312 || "group_commit_initiate",
313 async move {
314 tokio::time::sleep(Duration::from_millis(remaining_ms.into())).await;
315 // It is not an error for this task to be running after `internal_cmd_rx` is dropped.
316 let result =
317 internal_cmd_tx.send(Message::GroupCommitInitiate(Span::current(), permit));
318 if let Err(e) = result {
319 warn!("internal_cmd_rx dropped before we could send: {:?}", e);
320 }
321 }
322 .instrument(Span::current()),
323 );
324 } else {
325 self.group_commit(permit).await;
326 }
327 }
328
329 /// Tries to commit all pending writes transactions at the same timestamp.
330 ///
331 /// If the caller of this function has the `write_lock` acquired, then they can optionally pass
332 /// it in to this method. If the caller does not have the `write_lock` acquired and the
333 /// `write_lock` is currently locked by another operation, then only writes to system tables
334 /// and table advancements will be applied. If the caller does not have the `write_lock`
335 /// acquired and the `write_lock` is not currently locked by another operation, then group
336 /// commit will acquire it and all writes will be applied.
337 ///
338 /// All applicable pending writes will be combined into a single Append command and sent to
339 /// STORAGE as a single batch. All applicable writes will happen at the same timestamp and all
340 /// involved tables will be advanced to some timestamp larger than the timestamp of the write.
341 ///
342 /// Returns the timestamp of the write.
343 #[instrument(name = "coord::group_commit")]
344 pub(crate) async fn group_commit(&mut self, permit: Option<GroupCommitPermit>) -> Timestamp {
345 let mut validated_writes = Vec::new();
346 let mut deferred_writes = Vec::new();
347 let mut group_write_locks = GroupCommitWriteLocks::default();
348
349 // TODO(parkmycar): Refactor away this allocation. Currently `drain(..)` requires holding
350 // a mutable borrow on the Coordinator and so does trying to grant a write lock.
351 let pending_writes: Vec<_> = self.pending_writes.drain(..).collect();
352
353 // Validate, merge, and possibly acquire write locks for as many pending writes as possible.
354 for pending_write in pending_writes {
355 match pending_write {
356 // We always allow system writes to proceed.
357 PendingWriteTxn::System { .. } => validated_writes.push(pending_write),
358 // We have a set of locks! Validate they're correct (expected).
359 PendingWriteTxn::User {
360 span,
361 write_locks: Some(write_locks),
362 writes,
363 responder: UserWriteResponder::Session(pending_txn),
364 } => match write_locks.validate(writes.keys().copied()) {
365 Ok(validated_locks) => {
366 // Merge all of our write locks together since we can allow concurrent
367 // writes at the same timestamp.
368 group_write_locks.merge(validated_locks);
369
370 let validated_write = PendingWriteTxn::User {
371 span,
372 writes,
373 write_locks: None,
374 responder: UserWriteResponder::Session(pending_txn),
375 };
376 validated_writes.push(validated_write);
377 }
378 // This is very unexpected since callers of this method should be validating.
379 //
380 // We cannot allow these write to occur since if the correct set of locks was
381 // not taken we could violate serializability.
382 Err(missing) => {
383 let writes: Vec<_> = writes.keys().collect();
384 panic!(
385 "got to group commit with partial set of locks!\nmissing: {:?}, writes: {:?}, txn: {:?}",
386 missing, writes, pending_txn,
387 );
388 }
389 },
390 // If we don't have any locks, try to acquire them, otherwise defer the write.
391 PendingWriteTxn::User {
392 span,
393 writes,
394 write_locks: None,
395 responder: UserWriteResponder::Session(pending_txn),
396 } => {
397 let missing = group_write_locks.missing_locks(writes.keys().copied());
398
399 if missing.is_empty() {
400 // We have all the locks! Queue the pending write.
401 let validated_write = PendingWriteTxn::User {
402 span,
403 writes,
404 write_locks: None,
405 responder: UserWriteResponder::Session(pending_txn),
406 };
407 validated_writes.push(validated_write);
408 } else {
409 // Try to acquire the locks we're missing.
410 let mut just_in_time_locks = WriteLocks::builder(missing.clone());
411 for collection in missing {
412 if let Some(lock) = self.try_grant_object_write_lock(collection) {
413 just_in_time_locks.insert_lock(collection, lock);
414 }
415 }
416
417 match just_in_time_locks.all_or_nothing(pending_txn.ctx.session().conn_id())
418 {
419 // We acquired all of the locks! Proceed with the write.
420 Ok(locks) => {
421 group_write_locks.merge(locks);
422 let validated_write = PendingWriteTxn::User {
423 span,
424 writes,
425 write_locks: None,
426 responder: UserWriteResponder::Session(pending_txn),
427 };
428 validated_writes.push(validated_write);
429 }
430 // Darn. We couldn't acquire the locks, defer the write.
431 Err(missing) => {
432 let acquire_future =
433 self.grant_object_write_lock(missing).map(Option::Some);
434 let write = DeferredWrite {
435 span,
436 writes,
437 pending_txn,
438 };
439 deferred_writes.push((acquire_future, write));
440 }
441 }
442 }
443 }
444 }
445 }
446
447 // Queue all of our deferred ops.
448 for (acquire_future, write) in deferred_writes {
449 self.defer_op(acquire_future, DeferredOp::Write(write));
450 }
451
452 // The value returned here still might be ahead of `now()` if `now()` has gone backwards at
453 // any point during this method or if this was triggered from DDL. We will still commit the
454 // write without waiting for `now()` to advance. This is ok because the next batch of writes
455 // will trigger the wait loop in `try_group_commit()` if `now()` hasn't advanced past the
456 // global timeline, preventing an unbounded advancing of the global timeline ahead of
457 // `now()`. Additionally DDL is infrequent enough and takes long enough that we don't think
458 // it's practical for continuous DDL to advance the global timestamp in an unbounded manner.
459 let WriteTimestamp {
460 timestamp,
461 advance_to,
462 } = self.get_local_write_ts().await;
463
464 // Advance the catalog shard's upper to keep it in sync with the oracle
465 // timestamp. This ensures that reads of mz_catalog_raw at the oracle's
466 // read_ts do not block waiting for the catalog shard's upper to advance.
467 let catalog_upper_start = Instant::now();
468 self.catalog
469 .advance_upper(advance_to)
470 .await
471 .unwrap_or_terminate("unable to advance catalog upper");
472 self.metrics
473 .group_commit_catalog_upper_seconds
474 .observe(catalog_upper_start.elapsed().as_secs_f64());
475
476 let mut appends: BTreeMap<CatalogItemId, SmallVec<[TableData; 1]>> = BTreeMap::new();
477 let mut responses = Vec::with_capacity(validated_writes.len());
478 let mut notifies = Vec::new();
479
480 for validated_write_txn in validated_writes {
481 match validated_write_txn {
482 PendingWriteTxn::User {
483 span: _,
484 writes,
485 write_locks,
486 responder:
487 UserWriteResponder::Session(PendingTxn {
488 ctx,
489 response,
490 action,
491 }),
492 } => {
493 assert_none!(write_locks, "should have merged together all locks above");
494 for (id, table_data) in writes {
495 // If the table that some write was targeting has been deleted while the
496 // write was waiting, then the write will be ignored and we respond to the
497 // client that the write was successful. This is only possible if the write
498 // and the delete were concurrent. Therefore, we are free to order the
499 // write before the delete without violating any consistency guarantees.
500 if self.catalog().try_get_entry(&id).is_some() {
501 appends.entry(id).or_default().extend(table_data);
502 }
503 }
504 if let Some(id) = ctx.extra().contents() {
505 self.set_statement_execution_timestamp(id, timestamp);
506 }
507
508 responses.push(CompletedClientTransmitter::new(ctx, response, action));
509 }
510 PendingWriteTxn::System { updates, source } => {
511 for update in updates {
512 appends.entry(update.id).or_default().push(update.data);
513 }
514 // Once the write completes we notify any waiters.
515 match source {
516 BuiltinTableUpdateSource::Internal(tx)
517 | BuiltinTableUpdateSource::Background(tx) => notifies.push(tx),
518 }
519 }
520 }
521 }
522
523 // Consolidate all Rows for a given table. We do not consolidate the
524 // staged batches, that's up to whoever staged them.
525 let mut all_appends = Vec::with_capacity(appends.len());
526 for (item_id, table_data) in appends.into_iter() {
527 let mut all_rows = Vec::new();
528 let mut all_data = Vec::new();
529 for data in table_data {
530 match data {
531 TableData::Rows(rows) => all_rows.extend(rows),
532 TableData::Batches(_) => all_data.push(data),
533 }
534 }
535 differential_dataflow::consolidation::consolidate(&mut all_rows);
536 all_data.push(TableData::Rows(all_rows));
537
538 // TODO(parkmycar): Use SmallVec throughout.
539 all_appends.push((item_id, all_data));
540 }
541
542 let appends: Vec<_> = all_appends
543 .into_iter()
544 .map(|(id, updates)| {
545 let gid = self.catalog().get_entry(&id).latest_global_id();
546 (gid, updates)
547 })
548 .collect();
549
550 // Log non-empty user appends.
551 let modified_tables: Vec<_> = appends
552 .iter()
553 .filter_map(|(id, updates)| {
554 if id.is_user() && !updates.iter().all(|u| u.is_empty()) {
555 Some(id)
556 } else {
557 None
558 }
559 })
560 .collect();
561 if !modified_tables.is_empty() {
562 info!(
563 "Appending to tables, {modified_tables:?}, at {timestamp}, advancing to {advance_to}"
564 );
565 }
566
567 // Instrument our table writes since they can block the coordinator.
568 let histogram = self.metrics.append_table_duration_seconds.clone();
569
570 // NOTE: It is important that we append, even when there are no actual
571 // appends. This makes sure we periodically bump the upper of all
572 // tables, which is required to make them readable at the latest oracle
573 // read ts.
574
575 let append_fut = self
576 .controller
577 .storage
578 .append_table(timestamp, advance_to, appends)
579 .expect("invalid updates")
580 .wall_time()
581 .observe(histogram);
582
583 // Spawn a task to do the table writes.
584 let internal_cmd_tx = self.internal_cmd_tx.clone();
585 let apply_write_fut = self.apply_local_write(timestamp);
586
587 let span = debug_span!(parent: None, "group_commit_apply");
588 OpenTelemetryContext::obtain().attach_as_parent_to(&span);
589 task::spawn(
590 || "group_commit_apply",
591 async move {
592 // Wait for the writes to complete.
593 match append_fut
594 .instrument(debug_span!("group_commit_apply::append_fut"))
595 .await
596 {
597 Ok(append_result) => {
598 append_result.unwrap_or_terminate("cannot fail to apply appends")
599 }
600 Err(_) => warn!("Writer terminated with writes in indefinite state"),
601 };
602
603 // Apply the write by marking the timestamp as complete on the timeline.
604 apply_write_fut
605 .instrument(debug_span!("group_commit_apply::append_write_fut"))
606 .await;
607
608 // Notify the external clients of the result.
609 for response in responses {
610 let (mut ctx, result) = response.finalize();
611 ctx.session_mut().apply_write(timestamp);
612 ctx.retire(result);
613 }
614
615 // IMPORTANT: Make sure we hold the permit and write locks
616 // until here, to prevent other writes from going through while
617 // we haven't yet applied the write at the timestamp oracle.
618 drop(permit);
619 drop(group_write_locks);
620
621 // Advance other timelines.
622 if let Err(e) = internal_cmd_tx.send(Message::AdvanceTimelines) {
623 warn!("Server closed with non-advanced timelines, {e}");
624 }
625
626 for notify in notifies {
627 // We don't care if the listeners have gone away.
628 let _ = notify.send(());
629 }
630 }
631 .instrument(span),
632 );
633
634 timestamp
635 }
636
637 /// Submit a write to be executed during the next group commit and trigger a group commit.
638 pub(crate) fn submit_write(&mut self, pending_write_txn: PendingWriteTxn) {
639 if self.controller.read_only() {
640 panic!(
641 "attempting table write in read-only mode: {:?}",
642 pending_write_txn
643 );
644 }
645 self.pending_writes.push(pending_write_txn);
646 self.trigger_group_commit();
647 }
648
649 /// Append some [`BuiltinTableUpdate`]s, with various degrees of waiting and blocking.
650 pub(crate) fn builtin_table_update<'a>(&'a mut self) -> BuiltinTableAppend<'a> {
651 BuiltinTableAppend { coord: self }
652 }
653
654 pub(crate) fn defer_op<F>(&mut self, acquire_future: F, op: DeferredOp)
655 where
656 F: Future<Output = Option<(CatalogItemId, tokio::sync::OwnedMutexGuard<()>)>>
657 + Send
658 + 'static,
659 {
660 let conn_id = op.conn_id().clone();
661
662 // Track all of our deferred ops.
663 let is_optimistic = op.can_be_optimistically_retried();
664 self.deferred_write_ops.insert(conn_id.clone(), op);
665
666 let internal_cmd_tx = self.internal_cmd_tx.clone();
667 let conn_id_ = conn_id.clone();
668 mz_ore::task::spawn(|| format!("defer op {conn_id_}"), async move {
669 tracing::info!(%conn_id, "deferring plan");
670 // Once we can acquire the first failed lock, try running the deferred plan.
671 //
672 // Note: This does not guarantee the plan will be able to run, there might be
673 // other locks that we later fail to get.
674 let acquired_lock = acquire_future.await;
675
676 // Some operations, e.g. blind INSERTs, can be optimistically retried, meaning we
677 // can run multiple at once. In those cases we don't hold the lock so we retry all
678 // blind writes for a single object.
679 let acquired_lock = match (acquired_lock, is_optimistic) {
680 (Some(_lock), true) => None,
681 (Some(lock), false) => Some(lock),
682 (None, _) => None,
683 };
684
685 // If this send fails then the Coordinator is shutting down.
686 let _ = internal_cmd_tx.send(Message::TryDeferred {
687 conn_id,
688 acquired_lock,
689 });
690 });
691 }
692
693 /// Returns a future that waits until it can get an exclusive lock on the specified collection.
694 pub(crate) fn grant_object_write_lock(
695 &mut self,
696 object_id: CatalogItemId,
697 ) -> impl Future<Output = (CatalogItemId, OwnedMutexGuard<()>)> + 'static {
698 let write_lock_handle = self
699 .write_locks
700 .entry(object_id)
701 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())));
702 let write_lock_handle = Arc::clone(write_lock_handle);
703
704 write_lock_handle
705 .lock_owned()
706 .map(move |guard| (object_id, guard))
707 }
708
709 /// Lazily creates the lock for the provided `object_id`, and grants it if possible, returns
710 /// `None` if the lock is already held.
711 pub(crate) fn try_grant_object_write_lock(
712 &mut self,
713 object_id: CatalogItemId,
714 ) -> Option<OwnedMutexGuard<()>> {
715 let write_lock_handle = self
716 .write_locks
717 .entry(object_id)
718 .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(())));
719 let write_lock_handle = Arc::clone(write_lock_handle);
720
721 write_lock_handle.try_lock_owned().ok()
722 }
723}
724
725/// Helper struct to run a builtin table append.
726pub struct BuiltinTableAppend<'a> {
727 coord: &'a mut Coordinator,
728}
729
730/// `Future` that notifies when a builtin table write has completed.
731///
732/// Note: builtin table writes need to talk to persist, which can take 100s of milliseconds. This
733/// type allows you to execute a builtin table write, e.g. via [`BuiltinTableAppend::execute`], and
734/// wait for it to complete, while other long running tasks are concurrently executing.
735pub type BuiltinTableAppendNotify = Pin<Box<dyn Future<Output = ()> + Send + Sync + 'static>>;
736
737impl<'a> BuiltinTableAppend<'a> {
738 /// Submit a write to a system table to be executed during the next group commit. This method
739 /// __does not__ trigger a group commit.
740 ///
741 /// This is useful for non-critical writes like metric updates because it allows us to piggy
742 /// back off the next group commit instead of triggering a potentially expensive group commit.
743 ///
744 /// Note: __do not__ call this for DDL which needs the system tables updated immediately.
745 ///
746 /// Note: When in read-only mode, this will buffer the update and return
747 /// immediately.
748 pub fn background(self, mut updates: Vec<BuiltinTableUpdate>) -> BuiltinTableAppendNotify {
749 if self.coord.controller.read_only() {
750 self.coord
751 .buffered_builtin_table_updates
752 .as_mut()
753 .expect("in read-only mode")
754 .append(&mut updates);
755
756 return Box::pin(futures::future::ready(()));
757 }
758
759 let (tx, rx) = oneshot::channel();
760 self.coord.pending_writes.push(PendingWriteTxn::System {
761 updates,
762 source: BuiltinTableUpdateSource::Background(tx),
763 });
764
765 Box::pin(rx.map(|_| ()))
766 }
767
768 /// Submits a write to be executed during the next group commit __and__ triggers a group commit.
769 ///
770 /// Returns a `Future` that resolves when the write has completed, does not block the
771 /// Coordinator.
772 ///
773 /// Note: When in read-only mode, this will buffer the update and the
774 /// returned future will resolve immediately, without the update actually
775 /// having been written.
776 pub fn defer(self, mut updates: Vec<BuiltinTableUpdate>) -> BuiltinTableAppendNotify {
777 if self.coord.controller.read_only() {
778 self.coord
779 .buffered_builtin_table_updates
780 .as_mut()
781 .expect("in read-only mode")
782 .append(&mut updates);
783
784 return Box::pin(futures::future::ready(()));
785 }
786
787 let (tx, rx) = oneshot::channel();
788 self.coord.pending_writes.push(PendingWriteTxn::System {
789 updates,
790 source: BuiltinTableUpdateSource::Internal(tx),
791 });
792 self.coord.trigger_group_commit();
793
794 Box::pin(rx.map(|_| ()))
795 }
796
797 /// Submit a write to a system table.
798 ///
799 /// This method will block the Coordinator on acquiring a write timestamp from the timestamp
800 /// oracle, and then returns a `Future` that will complete once the write has been applied and
801 /// the write timestamp.
802 ///
803 /// Note: When in read-only mode, this will buffer the update, the
804 /// returned future will resolve immediately, without the update actually
805 /// having been written, and no timestamp is returned.
806 pub async fn execute(
807 self,
808 mut updates: Vec<BuiltinTableUpdate>,
809 ) -> (BuiltinTableAppendNotify, Option<Timestamp>) {
810 if self.coord.controller.read_only() {
811 self.coord
812 .buffered_builtin_table_updates
813 .as_mut()
814 .expect("in read-only mode")
815 .append(&mut updates);
816
817 return (Box::pin(futures::future::ready(())), None);
818 }
819
820 let (tx, rx) = oneshot::channel();
821
822 // Most DDL queries cause writes to system tables. Unlike writes to user tables, system
823 // table writes do not wait for a group commit, they explicitly trigger one. There is a
824 // possibility that if a user is executing DDL at a rate faster than 1 query per
825 // millisecond, then the global timeline will unboundedly advance past the system clock.
826 // This can cause future queries to block, but will not affect correctness. Since this
827 // rate of DDL is unlikely, we allow DDL to explicitly trigger group commit.
828 self.coord.pending_writes.push(PendingWriteTxn::System {
829 updates,
830 source: BuiltinTableUpdateSource::Internal(tx),
831 });
832 let write_ts = self.coord.group_commit(None).await;
833
834 // Avoid excessive group commits by resetting the periodic table advancement timer. The
835 // group commit triggered by above will already advance all tables.
836 self.coord.advance_timelines_interval.reset();
837
838 (Box::pin(rx.map(|_| ())), Some(write_ts))
839 }
840
841 /// Submit a write to a system table, blocking until complete.
842 ///
843 /// Note: if possible you should use the `execute(...)` method, which returns a `Future` that
844 /// can be `await`-ed concurrently with other tasks.
845 ///
846 /// Note: When in read-only mode, this will buffer the update and the
847 /// returned future will resolve immediately, without the update actually
848 /// having been written.
849 pub async fn blocking(self, updates: Vec<BuiltinTableUpdate>) {
850 let (notify, _) = self.execute(updates).await;
851 notify.await;
852 }
853}
854
855/// Returns two sides of a "channel" that can be used to notify the coordinator when we want a
856/// group commit to be run.
857pub fn notifier() -> (GroupCommitNotifier, GroupCommitWaiter) {
858 let notify = Arc::new(Notify::new());
859 let in_progress = Arc::new(Semaphore::new(1));
860
861 let notifier = GroupCommitNotifier {
862 notify: Arc::clone(¬ify),
863 };
864 let waiter = GroupCommitWaiter {
865 notify,
866 in_progress,
867 };
868
869 (notifier, waiter)
870}
871
872/// A handle that allows us to notify the coordinator that a group commit should be run at some
873/// point in the future.
874#[derive(Debug, Clone)]
875pub struct GroupCommitNotifier {
876 /// Tracks if there are any outstanding group commits.
877 notify: Arc<Notify>,
878}
879
880impl GroupCommitNotifier {
881 /// Notifies the [`GroupCommitWaiter`] that we'd like a group commit to be run.
882 pub fn notify(&self) {
883 self.notify.notify_one()
884 }
885}
886
887/// A handle that returns a future when a group commit needs to be run, and one is not currently
888/// being run.
889#[derive(Debug)]
890pub struct GroupCommitWaiter {
891 /// Tracks if there are any outstanding group commits.
892 notify: Arc<Notify>,
893 /// Distributes permits which tracks in progress group commits.
894 in_progress: Arc<Semaphore>,
895}
896static_assertions::assert_not_impl_all!(GroupCommitWaiter: Clone);
897
898impl GroupCommitWaiter {
899 /// Returns a permit for a group commit, once a permit is available _and_ there someone
900 /// requested a group commit to be run.
901 ///
902 /// # Cancel Safety
903 ///
904 /// * Waiting on the returned Future is cancel safe because we acquire an in-progress permit
905 /// before waiting for notifications. If the Future gets dropped after acquiring a permit but
906 /// before a group commit is queued, we'll release the permit which can be acquired by the
907 /// next caller.
908 ///
909 pub async fn ready(&self) -> GroupCommitPermit {
910 let permit = Semaphore::acquire_owned(Arc::clone(&self.in_progress))
911 .await
912 .expect("semaphore should not close");
913
914 // Note: We must wait for notifies _after_ waiting for a permit to be acquired for cancel
915 // safety.
916 self.notify.notified().await;
917
918 GroupCommitPermit {
919 _permit: Some(permit),
920 }
921 }
922}
923
924/// A permit to run a group commit, this must be kept alive for the entire duration of the commit.
925///
926/// Note: We sometimes want to throttle how many group commits are running at once, which this
927/// permit allows us to do.
928#[derive(Debug)]
929pub struct GroupCommitPermit {
930 /// Permit that is preventing other group commits from running.
931 ///
932 /// Only `None` if the permit has been moved into a tokio task for waiting.
933 _permit: Option<OwnedSemaphorePermit>,
934}
935
936/// When we start a [`Session`] we need to update some builtin tables, but we don't want to wait for
937/// these writes to complete for two reasons:
938///
939/// 1. Doing a write can take a relatively long time.
940/// 2. Decoupling the write from the session start allows us to batch multiple writes together, if
941/// sessions are being created with a high frequency.
942///
943/// So, as an optimization we do not wait for these writes to complete. But if a [`Session`] tries
944/// to query any of these builtin objects, we need to block that query on the writes completing to
945/// maintain linearizability.
946///
947/// Warning: this already clears the wait flag (i.e., it calls `clear_builtin_table_updates`).
948///
949/// TODO(peek-seq): After we delete the old peek sequencing, we can remove the first component of
950/// the return tuple.
951pub(crate) fn waiting_on_startup_appends(
952 catalog: &Catalog,
953 session: &mut Session,
954 plan: &Plan,
955) -> Option<(BTreeSet<CatalogItemId>, BoxFuture<'static, ()>)> {
956 // TODO(parkmycar): We need to check transitive uses here too if we ever move the
957 // referenced builtin tables out of mz_internal, or we allow creating views on
958 // mz_internal objects.
959 let depends_on = match plan {
960 Plan::Select(plan) => plan.source.depends_on(),
961 Plan::ReadThenWrite(plan) => plan.selection.depends_on(),
962 Plan::ShowColumns(plan) => plan.select_plan.source.depends_on(),
963 Plan::Subscribe(plan) => plan.from.depends_on(),
964 Plan::ExplainPlan(ExplainPlanPlan {
965 explainee: Explainee::Statement(ExplaineeStatement::Select { plan, .. }),
966 ..
967 }) => plan.source.depends_on(),
968 Plan::ExplainTimestamp(ExplainTimestampPlan { raw_plan, .. }) => raw_plan.depends_on(),
969 Plan::CreateConnection(_)
970 | Plan::CreateDatabase(_)
971 | Plan::CreateSchema(_)
972 | Plan::CreateRole(_)
973 | Plan::CreateNetworkPolicy(_)
974 | Plan::CreateCluster(_)
975 | Plan::CreateClusterReplica(_)
976 | Plan::CreateSource(_)
977 | Plan::CreateSources(_)
978 | Plan::CreateSecret(_)
979 | Plan::CreateSink(_)
980 | Plan::CreateTable(_)
981 | Plan::CreateView(_)
982 | Plan::CreateMaterializedView(_)
983 | Plan::CreateIndex(_)
984 | Plan::CreateType(_)
985 | Plan::Comment(_)
986 | Plan::DiscardTemp
987 | Plan::DiscardAll
988 | Plan::DropObjects(_)
989 | Plan::DropOwned(_)
990 | Plan::EmptyQuery
991 | Plan::ShowAllVariables
992 | Plan::ShowCreate(_)
993 | Plan::ShowVariable(_)
994 | Plan::InspectShard(_)
995 | Plan::SetVariable(_)
996 | Plan::ResetVariable(_)
997 | Plan::SetTransaction(_)
998 | Plan::StartTransaction(_)
999 | Plan::CommitTransaction(_)
1000 | Plan::AbortTransaction(_)
1001 | Plan::CopyFrom(_)
1002 | Plan::CopyTo(_)
1003 | Plan::ExplainPlan(_)
1004 | Plan::ExplainPushdown(_)
1005 | Plan::ExplainSinkSchema(_)
1006 | Plan::Insert(_)
1007 | Plan::AlterNetworkPolicy(_)
1008 | Plan::AlterNoop(_)
1009 | Plan::AlterClusterRename(_)
1010 | Plan::AlterClusterSwap(_)
1011 | Plan::AlterClusterReplicaRename(_)
1012 | Plan::AlterCluster(_)
1013 | Plan::AlterConnection(_)
1014 | Plan::AlterSource(_)
1015 | Plan::AlterSetCluster(_)
1016 | Plan::AlterItemRename(_)
1017 | Plan::AlterRetainHistory(_)
1018 | Plan::AlterSourceTimestampInterval(_)
1019 | Plan::AlterSchemaRename(_)
1020 | Plan::AlterSchemaSwap(_)
1021 | Plan::AlterSecret(_)
1022 | Plan::AlterSink(_)
1023 | Plan::AlterSystemSet(_)
1024 | Plan::AlterSystemReset(_)
1025 | Plan::AlterSystemResetAll(_)
1026 | Plan::AlterRole(_)
1027 | Plan::AlterOwner(_)
1028 | Plan::AlterTableAddColumn(_)
1029 | Plan::AlterMaterializedViewApplyReplacement(_)
1030 | Plan::Declare(_)
1031 | Plan::Fetch(_)
1032 | Plan::Close(_)
1033 | Plan::Prepare(_)
1034 | Plan::Execute(_)
1035 | Plan::Deallocate(_)
1036 | Plan::Raise(_)
1037 | Plan::GrantRole(_)
1038 | Plan::RevokeRole(_)
1039 | Plan::GrantPrivileges(_)
1040 | Plan::RevokePrivileges(_)
1041 | Plan::AlterDefaultPrivileges(_)
1042 | Plan::ReassignOwned(_)
1043 | Plan::ValidateConnection(_)
1044 | Plan::SideEffectingFunc(_) => BTreeSet::default(),
1045 };
1046 let depends_on_required_id = REQUIRED_BUILTIN_TABLES
1047 .iter()
1048 .map(|table| catalog.resolve_builtin_table(&**table))
1049 .any(|id| {
1050 catalog
1051 .get_global_ids(&id)
1052 .any(|gid| depends_on.contains(&gid))
1053 });
1054
1055 // If our plan does not depend on any required ID, then we don't need to
1056 // wait for any builtin writes to occur.
1057 if !depends_on_required_id {
1058 return None;
1059 }
1060
1061 // Even if we depend on a builtin table, there's no need to wait if the
1062 // writes have already completed.
1063 //
1064 // TODO(parkmycar): As an optimization we should add a `Notify` type to
1065 // `mz_ore` that allows peeking. If the builtin table writes have already
1066 // completed then there is no need to defer this plan.
1067 match session.clear_builtin_table_updates() {
1068 Some(wait_future) => {
1069 let depends_on = depends_on
1070 .into_iter()
1071 .map(|gid| catalog.get_entry_by_global_id(&gid).id())
1072 .collect();
1073 Some((depends_on, wait_future.boxed()))
1074 }
1075 None => None,
1076 }
1077}