Skip to main content

mz_storage_controller/
collection_mgmt.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Tokio tasks (and support machinery) for maintaining storage-managed
11//! collections.
12//!
13//! We differentiate between append-only collections and differential
14//! collections. The intent is that knowing the type allows being more
15//! intentional about what state we keep in memory and how we work when in
16//! read-only mode / during zero-downtime upgrades.
17//!
18//! ## Append-only collections
19//!
20//! Writers only append blind writes. Those writes never fail. It does not
21//! matter at what timestamp they happen (to a degree, but ...).
22//!
23//! While in read-only mode, the append-only write task can immediately write
24//! updates out as batches, but only append them when going out of read-only
25//! mode.
26//!
27//! ## Differential collections
28//!
29//! These are very similar to the self-correcting persist_sink. We have an
30//! in-memory desired state and continually make it so that persist matches
31//! desired. As described below (in the task implementation), we could do this
32//! in a memory efficient way by keeping open a persist read handle and
33//! continually updating/consolidating our desired collection. This way, we
34//! would be memory-efficient even in read-only mode.
35//!
36//! This is an evolution of the current design where, on startup, we bring the
37//! persist collection into a known state (mostly by retracting everything) and
38//! then assume that this `envd` is the only writer. We panic when that is ever
39//! not the case, which we notice when the upper of a collection changes
40//! unexpectedly. With this new design we can instead continually update our
41//! view of the persist shard and emit updates when needed, when desired
42//! changed.
43//!
44//! NOTE: As it is, we always keep all of desired in memory. Only when told to
45//! go out of read-only mode would we start attempting to write.
46//!
47//! ## Read-only mode
48//!
49//! When [`CollectionManager`] is in read-only mode it cannot write out to
50//! persist. It will, however, maintain the `desired` state of differential
51//! collections so that we can immediately start writing out updates when going
52//! out of read-only mode.
53//!
54//! For append-only collections we either panic, in the case of
55//! [`CollectionManager::blind_write`], or report back a
56//! [`StorageError::ReadOnly`] when trying to append through a
57//! [`MonotonicAppender`] returned from
58//! [`CollectionManager::monotonic_appender`].
59
60use std::any::Any;
61use std::cmp::Reverse;
62use std::collections::{BTreeMap, BinaryHeap};
63use std::fmt::Debug;
64use std::ops::ControlFlow;
65use std::pin::Pin;
66use std::str::FromStr;
67use std::sync::atomic::{AtomicU64, Ordering};
68use std::sync::{Arc, Mutex};
69
70use anyhow::{anyhow, bail};
71use chrono::{DateTime, Utc};
72use differential_dataflow::consolidation;
73use differential_dataflow::lattice::Lattice;
74use futures::future::BoxFuture;
75use futures::stream::StreamExt;
76use futures::{Future, FutureExt};
77use mz_cluster_client::ReplicaId;
78use mz_dyncfg::ConfigSet;
79use mz_ore::now::{EpochMillis, NowFn};
80use mz_ore::retry::Retry;
81use mz_ore::soft_panic_or_log;
82use mz_ore::task::AbortOnDropHandle;
83use mz_persist_client::batch::Added;
84use mz_persist_client::read::ReadHandle;
85use mz_persist_client::write::WriteHandle;
86use mz_persist_types::Codec64;
87use mz_repr::adt::timestamp::CheckedTimestamp;
88use mz_repr::{ColumnName, Diff, GlobalId, Row, TimestampManipulation};
89use mz_storage_client::client::{AppendOnlyUpdate, Status, TimestamplessUpdate};
90use mz_storage_client::controller::{IntrospectionType, MonotonicAppender, StorageWriteOp};
91use mz_storage_client::healthcheck::{
92    MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_METRICS_HISTORY_DESC,
93    WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC, WALLCLOCK_LAG_HISTORY_DESC,
94};
95use mz_storage_client::metrics::StorageControllerMetrics;
96use mz_storage_client::statistics::ControllerSinkStatistics;
97use mz_storage_client::storage_collections::StorageCollections;
98use mz_storage_types::StorageDiff;
99use mz_storage_types::controller::InvalidUpper;
100use mz_storage_types::dyncfgs::{
101    REPLICA_METRICS_HISTORY_RETENTION_INTERVAL, WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL,
102    WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL,
103};
104use mz_storage_types::parameters::{
105    STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT, StorageParameters,
106};
107use mz_storage_types::sources::SourceData;
108use timely::progress::{Antichain, Timestamp};
109use tokio::sync::{mpsc, oneshot, watch};
110use tokio::time::{Duration, Instant};
111use tracing::{debug, error, info};
112
113use crate::{
114    StatusHistoryDesc, StatusHistoryRetentionPolicy, StorageError, collection_mgmt,
115    privatelink_status_history_desc, replica_status_history_desc, sink_status_history_desc,
116    snapshot_statistics, source_status_history_desc, statistics,
117};
118
119// Default rate at which we advance the uppers of managed collections.
120const DEFAULT_TICK_MS: u64 = 1_000;
121
122/// A channel for sending writes to a differential collection.
123type DifferentialWriteChannel<T> =
124    mpsc::UnboundedSender<(StorageWriteOp, oneshot::Sender<Result<(), StorageError<T>>>)>;
125
126/// A channel for sending writes to an append-only collection.
127type AppendOnlyWriteChannel<T> = mpsc::UnboundedSender<(
128    Vec<AppendOnlyUpdate>,
129    oneshot::Sender<Result<(), StorageError<T>>>,
130)>;
131
132type WriteTask = AbortOnDropHandle<()>;
133type ShutdownSender = oneshot::Sender<()>;
134
135/// Types of storage-managed/introspection collections:
136///
137/// Append-only: Only accepts blind writes, writes that can be applied at any
138/// timestamp and don’t depend on current collection contents.
139///
140/// Pseudo append-only: We treat them largely as append-only collections but
141/// periodically (currently on bootstrap) retract old updates from them.
142///
143/// Differential: at any given time `t` , collection contents mirrors some
144/// (small cardinality) state. The cardinality of the collection stays constant
145/// if the thing that is mirrored doesn’t change in cardinality. At steady
146/// state, updates always come in pairs of retractions/additions.
147pub enum CollectionManagerKind {
148    AppendOnly,
149    Differential,
150}
151
152#[derive(Debug, Clone)]
153pub struct CollectionManager<T>
154where
155    T: Timestamp + Lattice + Codec64 + TimestampManipulation,
156{
157    /// When a [`CollectionManager`] is in read-only mode it must not affect any
158    /// changes to external state.
159    read_only: bool,
160
161    // WIP: Name TBD! I thought about `managed_collections`, `ivm_collections`,
162    // `self_correcting_collections`.
163    /// These are collections that we write to by adding/removing updates to an
164    /// internal _desired_ collection. The `CollectionManager` continually makes
165    /// sure that collection contents (in persist) match the desired state.
166    differential_collections:
167        Arc<Mutex<BTreeMap<GlobalId, (DifferentialWriteChannel<T>, WriteTask, ShutdownSender)>>>,
168
169    /// Collections that we only append to using blind-writes.
170    ///
171    /// Every write succeeds at _some_ timestamp, and we never check what the
172    /// actual contents of the collection (in persist) are.
173    append_only_collections:
174        Arc<Mutex<BTreeMap<GlobalId, (AppendOnlyWriteChannel<T>, WriteTask, ShutdownSender)>>>,
175
176    /// Amount of time we'll wait before sending a batch of inserts to Persist, for user
177    /// collections.
178    user_batch_duration_ms: Arc<AtomicU64>,
179    now: NowFn,
180}
181
182/// The `CollectionManager` provides two complementary functions:
183/// - Providing an API to append values to a registered set of collections.
184///   For this usecase:
185///     - The `CollectionManager` expects to be the only writer.
186///     - Appending to a closed collection panics
187/// - Automatically advancing the timestamp of managed collections every
188///   second. For this usecase:
189///     - The `CollectionManager` handles contention by permitting and ignoring errors.
190///     - Closed collections will not panic if they continue receiving these requests.
191impl<T> CollectionManager<T>
192where
193    T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
194{
195    pub(super) fn new(read_only: bool, now: NowFn) -> CollectionManager<T> {
196        let batch_duration_ms: u64 = STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT
197            .as_millis()
198            .try_into()
199            .expect("known to fit");
200
201        CollectionManager {
202            read_only,
203            differential_collections: Arc::new(Mutex::new(BTreeMap::new())),
204            append_only_collections: Arc::new(Mutex::new(BTreeMap::new())),
205            user_batch_duration_ms: Arc::new(AtomicU64::new(batch_duration_ms)),
206            now,
207        }
208    }
209
210    /// Updates the duration we'll wait to batch events for user owned collections.
211    pub fn update_user_batch_duration(&self, duration: Duration) {
212        tracing::info!(?duration, "updating user batch duration");
213        let millis: u64 = duration.as_millis().try_into().unwrap_or(u64::MAX);
214        self.user_batch_duration_ms.store(millis, Ordering::Relaxed);
215    }
216
217    /// Registers a new _differential collection_.
218    ///
219    /// The [CollectionManager] will automatically advance the upper of every
220    /// registered collection every second.
221    ///
222    /// Update the `desired` state of a differential collection using
223    /// [Self::differential_write].
224    pub(super) fn register_differential_collection<R>(
225        &self,
226        id: GlobalId,
227        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
228        read_handle_fn: R,
229        force_writable: bool,
230        introspection_config: DifferentialIntrospectionConfig<T>,
231    ) where
232        R: FnMut() -> Pin<
233                Box<dyn Future<Output = ReadHandle<SourceData, (), T, StorageDiff>> + Send>,
234            > + Send
235            + Sync
236            + 'static,
237    {
238        let mut guard = self
239            .differential_collections
240            .lock()
241            .expect("collection_mgmt panicked");
242
243        // Check if this collection is already registered.
244        if let Some((_writer, task, _shutdown_tx)) = guard.get(&id) {
245            // The collection is already registered and the task is still running so nothing to do.
246            if !task.is_finished() {
247                // TODO(parkmycar): Panic here if we never see this error in production.
248                tracing::error!("Registered a collection twice! {id:?}");
249                return;
250            }
251        }
252
253        let read_only = self.get_read_only(id, force_writable);
254
255        // Spawns a new task so we can write to this collection.
256        let writer_and_handle = DifferentialWriteTask::spawn(
257            id,
258            write_handle,
259            read_handle_fn,
260            read_only,
261            self.now.clone(),
262            introspection_config,
263        );
264        let prev = guard.insert(id, writer_and_handle);
265
266        // Double check the previous task was actually finished.
267        if let Some((_, prev_task, _)) = prev {
268            assert!(
269                prev_task.is_finished(),
270                "should only spawn a new task if the previous is finished"
271            );
272        }
273    }
274
275    /// Registers a new _append-only collection_.
276    ///
277    /// The [CollectionManager] will automatically advance the upper of every
278    /// registered collection every second.
279    pub(super) fn register_append_only_collection(
280        &self,
281        id: GlobalId,
282        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
283        force_writable: bool,
284        introspection_config: Option<AppendOnlyIntrospectionConfig<T>>,
285    ) {
286        let mut guard = self
287            .append_only_collections
288            .lock()
289            .expect("collection_mgmt panicked");
290
291        // Check if this collection is already registered.
292        if let Some((_writer, task, _shutdown_tx)) = guard.get(&id) {
293            // The collection is already registered and the task is still running so nothing to do.
294            if !task.is_finished() {
295                // TODO(parkmycar): Panic here if we never see this error in production.
296                tracing::error!("Registered a collection twice! {id:?}");
297                return;
298            }
299        }
300
301        let read_only = self.get_read_only(id, force_writable);
302
303        // Spawns a new task so we can write to this collection.
304        let writer_and_handle = AppendOnlyWriteTask::spawn(
305            id,
306            write_handle,
307            read_only,
308            self.now.clone(),
309            Arc::clone(&self.user_batch_duration_ms),
310            introspection_config,
311        );
312        let prev = guard.insert(id, writer_and_handle);
313
314        // Double check the previous task was actually finished.
315        if let Some((_, prev_task, _)) = prev {
316            assert!(
317                prev_task.is_finished(),
318                "should only spawn a new task if the previous is finished"
319            );
320        }
321    }
322
323    /// Unregisters the given collection.
324    ///
325    /// Also waits until the `CollectionManager` has completed all outstanding work to ensure that
326    /// it has stopped referencing the provided `id`.
327    #[mz_ore::instrument(level = "debug")]
328    pub(super) fn unregister_collection(&self, id: GlobalId) -> BoxFuture<'static, ()> {
329        let prev = self
330            .differential_collections
331            .lock()
332            .expect("CollectionManager panicked")
333            .remove(&id);
334
335        // Wait for the task to complete before reporting as unregisted.
336        if let Some((_prev_writer, prev_task, shutdown_tx)) = prev {
337            // Notify the task it needs to shutdown.
338            //
339            // We can ignore errors here because they indicate the task is already done.
340            let _ = shutdown_tx.send(());
341            return Box::pin(prev_task.map(|_| ()));
342        }
343
344        let prev = self
345            .append_only_collections
346            .lock()
347            .expect("CollectionManager panicked")
348            .remove(&id);
349
350        // Wait for the task to complete before reporting as unregisted.
351        if let Some((_prev_writer, prev_task, shutdown_tx)) = prev {
352            // Notify the task it needs to shutdown.
353            //
354            // We can ignore errors here because they indicate the task is already done.
355            let _ = shutdown_tx.send(());
356            return Box::pin(prev_task.map(|_| ()));
357        }
358
359        Box::pin(futures::future::ready(()))
360    }
361
362    /// Returns a sender for writes to the given append-only collection.
363    ///
364    /// # Panics
365    /// - If `id` does not belong to an append-only collections.
366    pub(super) fn append_only_write_sender(&self, id: GlobalId) -> AppendOnlyWriteChannel<T> {
367        let collections = self.append_only_collections.lock().expect("poisoned");
368        match collections.get(&id) {
369            Some((tx, _, _)) => tx.clone(),
370            None => panic!("missing append-only collection: {id}"),
371        }
372    }
373
374    /// Returns a sender for writes to the given differential collection.
375    ///
376    /// # Panics
377    /// - If `id` does not belong to a differential collections.
378    pub(super) fn differential_write_sender(&self, id: GlobalId) -> DifferentialWriteChannel<T> {
379        let collections = self.differential_collections.lock().expect("poisoned");
380        match collections.get(&id) {
381            Some((tx, _, _)) => tx.clone(),
382            None => panic!("missing differential collection: {id}"),
383        }
384    }
385
386    /// Appends `updates` to the append-only collection identified by `id`, at
387    /// _some_ timestamp. Does not wait for the append to complete.
388    ///
389    /// # Panics
390    /// - If `id` does not belong to an append-only collections.
391    /// - If this [`CollectionManager`] is in read-only mode.
392    /// - If the collection closed.
393    pub(super) fn blind_write(&self, id: GlobalId, updates: Vec<AppendOnlyUpdate>) {
394        if self.read_only {
395            panic!("attempting blind write to {} while in read-only mode", id);
396        }
397
398        if updates.is_empty() {
399            return;
400        }
401
402        let collections = self.append_only_collections.lock().expect("poisoned");
403        match collections.get(&id) {
404            Some((update_tx, _, _)) => {
405                let (tx, _rx) = oneshot::channel();
406                update_tx.send((updates, tx)).expect("rx hung up");
407            }
408            None => panic!("missing append-only collection: {id}"),
409        }
410    }
411
412    /// Updates the desired collection state of the differential collection identified by
413    /// `id`. The underlying persist shard will reflect this change at
414    /// _some_point. Does not wait for the change to complete.
415    ///
416    /// # Panics
417    /// - If `id` does not belong to a differential collection.
418    /// - If the collection closed.
419    pub(super) fn differential_write(&self, id: GlobalId, op: StorageWriteOp) {
420        if op.is_empty_append() {
421            return;
422        }
423
424        let collections = self.differential_collections.lock().expect("poisoned");
425        match collections.get(&id) {
426            Some((update_tx, _, _)) => {
427                let (tx, _rx) = oneshot::channel();
428                update_tx.send((op, tx)).expect("rx hung up");
429            }
430            None => panic!("missing differential collection: {id}"),
431        }
432    }
433
434    /// Appends the given `updates` to the differential collection identified by `id`.
435    ///
436    /// # Panics
437    /// - If `id` does not belong to a differential collection.
438    /// - If the collection closed.
439    pub(super) fn differential_append(&self, id: GlobalId, updates: Vec<(Row, Diff)>) {
440        self.differential_write(id, StorageWriteOp::Append { updates })
441    }
442
443    /// Returns a [`MonotonicAppender`] that can be used to monotonically append updates to the
444    /// collection correlated with `id`.
445    pub(super) fn monotonic_appender(
446        &self,
447        id: GlobalId,
448    ) -> Result<MonotonicAppender<T>, StorageError<T>> {
449        let guard = self
450            .append_only_collections
451            .lock()
452            .expect("CollectionManager panicked");
453        let tx = guard
454            .get(&id)
455            .map(|(tx, _, _)| tx.clone())
456            .ok_or(StorageError::IdentifierMissing(id))?;
457
458        Ok(MonotonicAppender::new(tx))
459    }
460
461    fn get_read_only(&self, id: GlobalId, force_writable: bool) -> bool {
462        if force_writable {
463            assert!(id.is_system(), "unexpected non-system global id: {id:?}");
464            false
465        } else {
466            self.read_only
467        }
468    }
469}
470
471pub(crate) struct DifferentialIntrospectionConfig<T>
472where
473    T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
474{
475    pub(crate) recent_upper: Antichain<T>,
476    pub(crate) introspection_type: IntrospectionType,
477    pub(crate) storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
478    pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
479    pub(crate) source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
480    pub(crate) sink_statistics:
481        Arc<Mutex<BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSinkStatistics>>>,
482    pub(crate) statistics_interval: Duration,
483    pub(crate) statistics_interval_receiver: watch::Receiver<Duration>,
484    pub(crate) statistics_retention_duration: Duration,
485    pub(crate) metrics: StorageControllerMetrics,
486    pub(crate) introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
487}
488
489/// A task that will make it so that the state in persist matches the desired
490/// state and continuously bump the upper for the specified collection.
491///
492/// NOTE: This implementation is a bit clunky, and could be optimized by not keeping
493/// all of desired in memory (see commend below). It is meant to showcase the
494/// general approach.
495struct DifferentialWriteTask<T, R>
496where
497    T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
498    R: FnMut() -> Pin<Box<dyn Future<Output = ReadHandle<SourceData, (), T, StorageDiff>> + Send>>
499        + Send
500        + 'static,
501{
502    /// The collection that we are writing to.
503    id: GlobalId,
504
505    write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
506
507    /// For getting a [`ReadHandle`] to sync our state to persist contents.
508    read_handle_fn: R,
509
510    read_only: bool,
511
512    now: NowFn,
513
514    /// In the absence of updates, we regularly bump the upper to "now", on this
515    /// interval. This makes it so the collection remains readable at recent
516    /// timestamps.
517    upper_tick_interval: tokio::time::Interval,
518
519    /// Receiver for write commands. These change our desired state.
520    cmd_rx: mpsc::UnboundedReceiver<(StorageWriteOp, oneshot::Sender<Result<(), StorageError<T>>>)>,
521
522    /// We have to shut down when receiving from this.
523    shutdown_rx: oneshot::Receiver<()>,
524
525    /// The contents of the collection as it should be according to whoever is
526    /// driving us around.
527    // This is memory inefficient: we always keep a full copy of
528    // desired, so that we can re-derive a to_write if/when someone else
529    // writes to persist and we notice because of an upper conflict.
530    // This is optimized for the case where we rarely have more than one
531    // writer.
532    //
533    // We can optimize for a multi-writer case by keeping an open
534    // ReadHandle and continually reading updates from persist, updating
535    // a desired in place. Similar to the self-correcting persist_sink.
536    desired: Vec<(Row, Diff)>,
537
538    /// Updates that we have to write when next writing to persist. This is
539    /// determined by looking at what is desired and what is in persist.
540    to_write: Vec<(Row, Diff)>,
541
542    /// Current upper of the persist shard. We keep track of this so that we
543    /// realize when someone else writes to the shard, in which case we have to
544    /// update our state of the world, that is update our `to_write` based on
545    /// `desired` and the contents of the persist shard.
546    current_upper: T,
547}
548
549impl<T, R> DifferentialWriteTask<T, R>
550where
551    T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
552    R: FnMut() -> Pin<Box<dyn Future<Output = ReadHandle<SourceData, (), T, StorageDiff>> + Send>>
553        + Send
554        + Sync
555        + 'static,
556{
557    /// Spawns a [`DifferentialWriteTask`] in an [`mz_ore::task`] and returns
558    /// handles for interacting with it.
559    fn spawn(
560        id: GlobalId,
561        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
562        read_handle_fn: R,
563        read_only: bool,
564        now: NowFn,
565        introspection_config: DifferentialIntrospectionConfig<T>,
566    ) -> (DifferentialWriteChannel<T>, WriteTask, ShutdownSender) {
567        let (tx, rx) = mpsc::unbounded_channel();
568        let (shutdown_tx, shutdown_rx) = oneshot::channel();
569
570        let upper_tick_interval = tokio::time::interval(Duration::from_millis(DEFAULT_TICK_MS));
571
572        let current_upper = T::minimum();
573
574        let task = Self {
575            id,
576            write_handle,
577            read_handle_fn,
578            read_only,
579            now,
580            upper_tick_interval,
581            cmd_rx: rx,
582            shutdown_rx,
583            desired: Vec::new(),
584            to_write: Vec::new(),
585            current_upper,
586        };
587
588        let handle = mz_ore::task::spawn(
589            || format!("CollectionManager-differential_write_task-{id}"),
590            async move {
591                if !task.read_only {
592                    task.prepare(introspection_config).await;
593                }
594                let res = task.run().await;
595
596                match res {
597                    ControlFlow::Break(reason) => {
598                        info!("write_task-{} ending: {}", id, reason);
599                    }
600                    c => {
601                        unreachable!(
602                            "cannot break out of the loop with a Continue, but got: {:?}",
603                            c
604                        );
605                    }
606                }
607            },
608        );
609
610        (tx, handle.abort_on_drop(), shutdown_tx)
611    }
612
613    /// Does any work that is required before this background task starts
614    /// writing to the given introspection collection.
615    ///
616    /// This might include consolidation, deleting older entries or seeding
617    /// in-memory state of, say, scrapers, with current collection contents.
618    async fn prepare(&self, introspection_config: DifferentialIntrospectionConfig<T>) {
619        tracing::info!(%self.id, ?introspection_config.introspection_type, "preparing differential introspection collection for writes");
620
621        match introspection_config.introspection_type {
622            IntrospectionType::ShardMapping => {
623                // Done by the `append_shard_mappings` call.
624            }
625            IntrospectionType::Frontiers | IntrospectionType::ReplicaFrontiers => {
626                // Differential collections start with an empty
627                // desired state. No need to manually reset.
628            }
629            IntrospectionType::StorageSourceStatistics => {
630                let prev = snapshot_statistics(
631                    self.id,
632                    introspection_config.recent_upper,
633                    &introspection_config.storage_collections,
634                )
635                .await;
636
637                let scraper_token = statistics::spawn_statistics_scraper(
638                    self.id.clone(),
639                    // These do a shallow copy.
640                    introspection_config.collection_manager,
641                    Arc::clone(&introspection_config.source_statistics),
642                    prev,
643                    introspection_config.statistics_interval.clone(),
644                    introspection_config.statistics_interval_receiver.clone(),
645                    introspection_config.statistics_retention_duration,
646                    introspection_config.metrics,
647                );
648                let web_token = statistics::spawn_webhook_statistics_scraper(
649                    introspection_config.source_statistics,
650                    introspection_config.statistics_interval,
651                    introspection_config.statistics_interval_receiver,
652                );
653
654                // Make sure these are dropped when the controller is
655                // dropped, so that the internal task will stop.
656                introspection_config
657                    .introspection_tokens
658                    .lock()
659                    .expect("poisoned")
660                    .insert(self.id, Box::new((scraper_token, web_token)));
661            }
662            IntrospectionType::StorageSinkStatistics => {
663                let prev = snapshot_statistics(
664                    self.id,
665                    introspection_config.recent_upper,
666                    &introspection_config.storage_collections,
667                )
668                .await;
669
670                let scraper_token = statistics::spawn_statistics_scraper(
671                    self.id.clone(),
672                    introspection_config.collection_manager,
673                    Arc::clone(&introspection_config.sink_statistics),
674                    prev,
675                    introspection_config.statistics_interval,
676                    introspection_config.statistics_interval_receiver,
677                    introspection_config.statistics_retention_duration,
678                    introspection_config.metrics,
679                );
680
681                // Make sure this is dropped when the controller is
682                // dropped, so that the internal task will stop.
683                introspection_config
684                    .introspection_tokens
685                    .lock()
686                    .expect("poisoned")
687                    .insert(self.id, scraper_token);
688            }
689
690            IntrospectionType::ComputeDependencies
691            | IntrospectionType::ComputeOperatorHydrationStatus
692            | IntrospectionType::ComputeMaterializedViewRefreshes
693            | IntrospectionType::ComputeErrorCounts
694            | IntrospectionType::ComputeHydrationTimes => {
695                // Differential collections start with an empty
696                // desired state. No need to manually reset.
697            }
698
699            introspection_type @ IntrospectionType::ReplicaMetricsHistory
700            | introspection_type @ IntrospectionType::WallclockLagHistory
701            | introspection_type @ IntrospectionType::WallclockLagHistogram
702            | introspection_type @ IntrospectionType::PreparedStatementHistory
703            | introspection_type @ IntrospectionType::StatementExecutionHistory
704            | introspection_type @ IntrospectionType::SessionHistory
705            | introspection_type @ IntrospectionType::StatementLifecycleHistory
706            | introspection_type @ IntrospectionType::SqlText
707            | introspection_type @ IntrospectionType::SourceStatusHistory
708            | introspection_type @ IntrospectionType::SinkStatusHistory
709            | introspection_type @ IntrospectionType::PrivatelinkConnectionStatusHistory
710            | introspection_type @ IntrospectionType::ReplicaStatusHistory => {
711                unreachable!("not differential collection: {introspection_type:?}")
712            }
713        }
714    }
715
716    async fn run(mut self) -> ControlFlow<String> {
717        const BATCH_SIZE: usize = 4096;
718        let mut updates = Vec::with_capacity(BATCH_SIZE);
719        loop {
720            tokio::select! {
721                // Prefer sending actual updates over just bumping the upper,
722                // because sending updates also bump the upper.
723                biased;
724
725                // Listen for a shutdown signal so we can gracefully cleanup.
726                _ = &mut self.shutdown_rx => {
727                    self.handle_shutdown();
728
729                    return ControlFlow::Break("graceful shutdown".to_string());
730                }
731
732                // Pull a chunk of queued updates off the channel.
733                count = self.cmd_rx.recv_many(&mut updates, BATCH_SIZE) => {
734                    if count > 0 {
735                        let _ = self.handle_updates(&mut updates).await?;
736                    } else {
737                        // Sender has been dropped, which means the collection
738                        // should have been unregistered, break out of the run
739                        // loop if we weren't already aborted.
740                        return ControlFlow::Break("sender has been dropped".to_string());
741                    }
742                }
743
744                // If we haven't received any updates, then we'll move the upper forward.
745                _ = self.upper_tick_interval.tick() => {
746                    if self.read_only {
747                        // Not bumping uppers while in read-only mode.
748                        continue;
749                    }
750                    let _ = self.tick_upper().await?;
751                },
752            }
753        }
754    }
755
756    async fn tick_upper(&mut self) -> ControlFlow<String> {
757        let now = T::from((self.now)());
758
759        if now <= self.current_upper {
760            // Upper is already further along than current wall-clock time, no
761            // need to bump it.
762            return ControlFlow::Continue(());
763        }
764
765        assert!(!self.read_only);
766        let res = self
767            .write_handle
768            .compare_and_append_batch(
769                &mut [],
770                Antichain::from_elem(self.current_upper.clone()),
771                Antichain::from_elem(now.clone()),
772                true,
773            )
774            .await
775            .expect("valid usage");
776        match res {
777            // All good!
778            Ok(()) => {
779                tracing::debug!(%self.id, "bumped upper of differential collection");
780                self.current_upper = now;
781            }
782            Err(err) => {
783                // Someone else wrote to the collection or bumped the upper. We
784                // need to sync to latest persist state and potentially patch up
785                // our `to_write`, based on what we learn and `desired`.
786
787                let actual_upper = if let Some(ts) = err.current.as_option() {
788                    ts.clone()
789                } else {
790                    return ControlFlow::Break("upper is the empty antichain".to_string());
791                };
792
793                tracing::info!(%self.id, ?actual_upper, expected_upper = ?self.current_upper, "upper mismatch while bumping upper, syncing to persist state");
794
795                self.current_upper = actual_upper;
796
797                self.sync_to_persist().await;
798            }
799        }
800
801        ControlFlow::Continue(())
802    }
803
804    fn handle_shutdown(&mut self) {
805        let mut senders = Vec::new();
806
807        // Prevent new messages from being sent.
808        self.cmd_rx.close();
809
810        // Get as many waiting senders as possible.
811        while let Ok((_batch, sender)) = self.cmd_rx.try_recv() {
812            senders.push(sender);
813        }
814
815        // Notify them that this collection is closed.
816        //
817        // Note: if a task is shutting down, that indicates the source has been
818        // dropped, at which point the identifier is invalid. Returning this
819        // error provides a better user experience.
820        notify_listeners(senders, || Err(StorageError::IdentifierInvalid(self.id)));
821    }
822
823    async fn handle_updates(
824        &mut self,
825        batch: &mut Vec<(StorageWriteOp, oneshot::Sender<Result<(), StorageError<T>>>)>,
826    ) -> ControlFlow<String> {
827        // Put in place _some_ rate limiting.
828        let batch_duration_ms = STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT;
829
830        let use_batch_now = Instant::now();
831        let min_time_to_complete = use_batch_now + batch_duration_ms;
832
833        tracing::debug!(
834            ?use_batch_now,
835            ?batch_duration_ms,
836            ?min_time_to_complete,
837            "batch duration",
838        );
839
840        let mut responders = Vec::with_capacity(batch.len());
841        for (op, tx) in batch.drain(..) {
842            self.apply_write_op(op);
843            responders.push(tx);
844        }
845
846        // TODO: Maybe don't do it every time?
847        consolidation::consolidate(&mut self.desired);
848        consolidation::consolidate(&mut self.to_write);
849
850        // Reset the interval which is used to periodically bump the uppers
851        // because the uppers will get bumped with the following update.
852        // This makes it such that we will write at most once every
853        // `interval`.
854        //
855        // For example, let's say our `DEFAULT_TICK` interval is 10, so at
856        // `t + 10`, `t + 20`, ... we'll bump the uppers. If we receive an
857        // update at `t + 3` we want to shift this window so we bump the
858        // uppers at `t + 13`, `t + 23`, ... which resetting the interval
859        // accomplishes.
860        self.upper_tick_interval.reset();
861
862        self.write_to_persist(responders).await?;
863
864        // Wait until our artificial latency has completed.
865        //
866        // Note: if writing to persist took longer than `DEFAULT_TICK` this
867        // await will resolve immediately.
868        tokio::time::sleep_until(min_time_to_complete).await;
869
870        ControlFlow::Continue(())
871    }
872
873    /// Apply the given write operation to the `desired`/`to_write` state.
874    fn apply_write_op(&mut self, op: StorageWriteOp) {
875        match op {
876            StorageWriteOp::Append { updates } => {
877                self.desired.extend_from_slice(&updates);
878                self.to_write.extend(updates);
879            }
880            StorageWriteOp::Delete { filter } => {
881                let to_delete = self.desired.extract_if(.., |(row, _)| filter(row));
882                let retractions = to_delete.map(|(row, diff)| (row, -diff));
883                self.to_write.extend(retractions);
884            }
885        }
886    }
887
888    /// Attempt to write what is currently in [Self::to_write] to persist,
889    /// retrying and re-syncing to persist when necessary, that is when the
890    /// upper was not what we expected.
891    async fn write_to_persist(
892        &mut self,
893        responders: Vec<oneshot::Sender<Result<(), StorageError<T>>>>,
894    ) -> ControlFlow<String> {
895        if self.read_only {
896            tracing::debug!(%self.id, "not writing to differential collection: read-only");
897            // Not attempting to write while in read-only mode.
898            return ControlFlow::Continue(());
899        }
900
901        // We'll try really hard to succeed, but eventually stop.
902        //
903        // Note: it's very rare we should ever need to retry, and if we need to
904        // retry it should only take 1 or 2 attempts. We set `max_tries` to be
905        // high though because if we hit some edge case we want to try hard to
906        // commit the data.
907        let retries = Retry::default()
908            .initial_backoff(Duration::from_secs(1))
909            .clamp_backoff(Duration::from_secs(3))
910            .factor(1.25)
911            .max_tries(20)
912            .into_retry_stream();
913        let mut retries = Box::pin(retries);
914
915        loop {
916            // Append updates to persist!
917            let now = T::from((self.now)());
918            let new_upper = std::cmp::max(
919                now,
920                TimestampManipulation::step_forward(&self.current_upper),
921            );
922
923            let updates_to_write = self
924                .to_write
925                .iter()
926                .map(|(row, diff)| {
927                    (
928                        (SourceData(Ok(row.clone())), ()),
929                        self.current_upper.clone(),
930                        diff.into_inner(),
931                    )
932                })
933                .collect::<Vec<_>>();
934
935            assert!(!self.read_only);
936            let res = self
937                .write_handle
938                .compare_and_append(
939                    updates_to_write,
940                    Antichain::from_elem(self.current_upper.clone()),
941                    Antichain::from_elem(new_upper.clone()),
942                )
943                .await
944                .expect("valid usage");
945            match res {
946                // Everything was successful!
947                Ok(()) => {
948                    // Notify all of our listeners.
949                    notify_listeners(responders, || Ok(()));
950
951                    self.current_upper = new_upper;
952
953                    // Very important! This is empty at steady state, while
954                    // desired keeps an in-memory copy of desired state.
955                    self.to_write.clear();
956
957                    tracing::debug!(%self.id, "appended to differential collection");
958
959                    // Break out of the retry loop so we can wait for more data.
960                    break;
961                }
962                // Failed to write to some collections,
963                Err(err) => {
964                    // Someone else wrote to the collection. We need to read
965                    // from persist and update to_write based on that and the
966                    // desired state.
967                    let actual_upper = if let Some(ts) = err.current.as_option() {
968                        ts.clone()
969                    } else {
970                        return ControlFlow::Break("upper is the empty antichain".to_string());
971                    };
972
973                    tracing::info!(%self.id, ?actual_upper, expected_upper = ?self.current_upper, "retrying append for differential collection");
974
975                    // We've exhausted all of our retries, notify listeners and
976                    // break out of the retry loop so we can wait for more data.
977                    if retries.next().await.is_none() {
978                        let invalid_upper = InvalidUpper {
979                            id: self.id,
980                            current_upper: err.current,
981                        };
982                        notify_listeners(responders, || {
983                            Err(StorageError::InvalidUppers(vec![invalid_upper.clone()]))
984                        });
985                        error!(
986                            "exhausted retries when appending to managed collection {}",
987                            self.id
988                        );
989                        break;
990                    }
991
992                    self.current_upper = actual_upper;
993
994                    self.sync_to_persist().await;
995
996                    debug!(
997                        "Retrying invalid-uppers error while appending to differential collection {}",
998                        self.id
999                    );
1000                }
1001            }
1002        }
1003
1004        ControlFlow::Continue(())
1005    }
1006
1007    /// Re-derives [Self::to_write] by looking at [Self::desired] and the
1008    /// current state in persist. We want to insert everything in desired and
1009    /// retract everything in persist. But ideally most of that cancels out in
1010    /// consolidation.
1011    ///
1012    /// To be called when a `compare_and_append` failed because the upper didn't
1013    /// match what we expected.
1014    async fn sync_to_persist(&mut self) {
1015        let mut read_handle = (self.read_handle_fn)().await;
1016        let as_of = self
1017            .current_upper
1018            .step_back()
1019            .unwrap_or_else(|| T::minimum());
1020        let as_of = Antichain::from_elem(as_of);
1021        let snapshot = read_handle.snapshot_and_fetch(as_of).await;
1022
1023        let mut negated_oks = match snapshot {
1024            Ok(contents) => {
1025                let mut snapshot = Vec::with_capacity(contents.len());
1026                for ((data, _), _, diff) in contents {
1027                    let row = data.0.unwrap();
1028                    snapshot.push((row, -Diff::from(diff)));
1029                }
1030                snapshot
1031            }
1032            Err(_) => panic!("read before since"),
1033        };
1034
1035        self.to_write.clear();
1036        self.to_write.extend(self.desired.iter().cloned());
1037        self.to_write.append(&mut negated_oks);
1038        consolidation::consolidate(&mut self.to_write);
1039    }
1040}
1041
1042pub(crate) struct AppendOnlyIntrospectionConfig<T>
1043where
1044    T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
1045{
1046    pub(crate) introspection_type: IntrospectionType,
1047    pub(crate) config_set: Arc<ConfigSet>,
1048    pub(crate) parameters: StorageParameters,
1049    pub(crate) storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
1050}
1051
1052/// A task that writes to an append only collection and continuously bumps the upper for the specified
1053/// collection.
1054///
1055/// For status history collections, this task can deduplicate redundant [`Statuses`](Status).
1056struct AppendOnlyWriteTask<T>
1057where
1058    T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
1059{
1060    /// The collection that we are writing to.
1061    id: GlobalId,
1062    write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
1063    read_only: bool,
1064    now: NowFn,
1065    user_batch_duration_ms: Arc<AtomicU64>,
1066    /// Receiver for write commands.
1067    rx: mpsc::UnboundedReceiver<(
1068        Vec<AppendOnlyUpdate>,
1069        oneshot::Sender<Result<(), StorageError<T>>>,
1070    )>,
1071
1072    /// We have to shut down when receiving from this.
1073    shutdown_rx: oneshot::Receiver<()>,
1074    /// If this collection deduplicates statuses, this map is used to track the previous status.
1075    previous_statuses: Option<BTreeMap<(GlobalId, Option<ReplicaId>), Status>>,
1076}
1077
1078impl<T> AppendOnlyWriteTask<T>
1079where
1080    T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
1081{
1082    /// Spawns an [`AppendOnlyWriteTask`] in an [`mz_ore::task`] that will continuously bump the
1083    /// upper for the specified collection,
1084    /// and append data that is sent via the provided [`mpsc::UnboundedSender`].
1085    ///
1086    /// TODO(parkmycar): One day if we want to customize the tick interval for each collection, that
1087    /// should be done here.
1088    /// TODO(parkmycar): Maybe add prometheus metrics for each collection?
1089    fn spawn(
1090        id: GlobalId,
1091        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
1092        read_only: bool,
1093        now: NowFn,
1094        user_batch_duration_ms: Arc<AtomicU64>,
1095        introspection_config: Option<AppendOnlyIntrospectionConfig<T>>,
1096    ) -> (AppendOnlyWriteChannel<T>, WriteTask, ShutdownSender) {
1097        let (tx, rx) = mpsc::unbounded_channel();
1098        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1099
1100        let previous_statuses: Option<BTreeMap<(GlobalId, Option<ReplicaId>), Status>> =
1101            match introspection_config
1102                .as_ref()
1103                .map(|config| config.introspection_type)
1104            {
1105                Some(IntrospectionType::SourceStatusHistory)
1106                | Some(IntrospectionType::SinkStatusHistory) => Some(BTreeMap::new()),
1107
1108                Some(IntrospectionType::ReplicaMetricsHistory)
1109                | Some(IntrospectionType::WallclockLagHistory)
1110                | Some(IntrospectionType::WallclockLagHistogram)
1111                | Some(IntrospectionType::PrivatelinkConnectionStatusHistory)
1112                | Some(IntrospectionType::ReplicaStatusHistory)
1113                | Some(IntrospectionType::PreparedStatementHistory)
1114                | Some(IntrospectionType::StatementExecutionHistory)
1115                | Some(IntrospectionType::SessionHistory)
1116                | Some(IntrospectionType::StatementLifecycleHistory)
1117                | Some(IntrospectionType::SqlText)
1118                | None => None,
1119
1120                Some(introspection_type @ IntrospectionType::ShardMapping)
1121                | Some(introspection_type @ IntrospectionType::Frontiers)
1122                | Some(introspection_type @ IntrospectionType::ReplicaFrontiers)
1123                | Some(introspection_type @ IntrospectionType::StorageSourceStatistics)
1124                | Some(introspection_type @ IntrospectionType::StorageSinkStatistics)
1125                | Some(introspection_type @ IntrospectionType::ComputeDependencies)
1126                | Some(introspection_type @ IntrospectionType::ComputeOperatorHydrationStatus)
1127                | Some(introspection_type @ IntrospectionType::ComputeMaterializedViewRefreshes)
1128                | Some(introspection_type @ IntrospectionType::ComputeErrorCounts)
1129                | Some(introspection_type @ IntrospectionType::ComputeHydrationTimes) => {
1130                    unreachable!("not append-only collection: {introspection_type:?}")
1131                }
1132            };
1133
1134        let mut task = Self {
1135            id,
1136            write_handle,
1137            rx,
1138            shutdown_rx,
1139            read_only,
1140            now,
1141            user_batch_duration_ms,
1142            previous_statuses,
1143        };
1144
1145        let handle = mz_ore::task::spawn(
1146            || format!("CollectionManager-append_only_write_task-{id}"),
1147            async move {
1148                if !task.read_only {
1149                    task.prepare(introspection_config).await;
1150                }
1151                task.run().await;
1152            },
1153        );
1154
1155        (tx, handle.abort_on_drop(), shutdown_tx)
1156    }
1157
1158    /// Does any work that is required before the background task starts
1159    /// writing to the given append only introspection collection.
1160    ///
1161    /// This might include consolidation or deleting older entries.
1162    async fn prepare(&mut self, introspection_config: Option<AppendOnlyIntrospectionConfig<T>>) {
1163        let Some(AppendOnlyIntrospectionConfig {
1164            introspection_type,
1165            config_set,
1166            parameters,
1167            storage_collections,
1168        }) = introspection_config
1169        else {
1170            return;
1171        };
1172        let initial_statuses = match introspection_type {
1173            IntrospectionType::ReplicaMetricsHistory
1174            | IntrospectionType::WallclockLagHistory
1175            | IntrospectionType::WallclockLagHistogram => {
1176                let result = partially_truncate_metrics_history(
1177                    self.id,
1178                    introspection_type,
1179                    &mut self.write_handle,
1180                    config_set,
1181                    self.now.clone(),
1182                    storage_collections,
1183                )
1184                .await;
1185                if let Err(error) = result {
1186                    soft_panic_or_log!(
1187                        "error truncating metrics history: {error} (type={introspection_type:?})"
1188                    );
1189                }
1190                Vec::new()
1191            }
1192
1193            IntrospectionType::PrivatelinkConnectionStatusHistory => {
1194                partially_truncate_status_history(
1195                    self.id,
1196                    IntrospectionType::PrivatelinkConnectionStatusHistory,
1197                    &mut self.write_handle,
1198                    privatelink_status_history_desc(&parameters),
1199                    self.now.clone(),
1200                    &storage_collections,
1201                )
1202                .await;
1203                Vec::new()
1204            }
1205            IntrospectionType::ReplicaStatusHistory => {
1206                partially_truncate_status_history(
1207                    self.id,
1208                    IntrospectionType::ReplicaStatusHistory,
1209                    &mut self.write_handle,
1210                    replica_status_history_desc(&parameters),
1211                    self.now.clone(),
1212                    &storage_collections,
1213                )
1214                .await;
1215                Vec::new()
1216            }
1217
1218            // Note [btv] - we don't truncate these, because that uses
1219            // a huge amount of memory on environmentd startup.
1220            IntrospectionType::PreparedStatementHistory
1221            | IntrospectionType::StatementExecutionHistory
1222            | IntrospectionType::SessionHistory
1223            | IntrospectionType::StatementLifecycleHistory
1224            | IntrospectionType::SqlText => {
1225                // NOTE(aljoscha): We never remove from these
1226                // collections. Someone, at some point needs to
1227                // think about that! Issue:
1228                // https://github.com/MaterializeInc/database-issues/issues/7666
1229                Vec::new()
1230            }
1231
1232            IntrospectionType::SourceStatusHistory => {
1233                let last_status_per_id = partially_truncate_status_history(
1234                    self.id,
1235                    IntrospectionType::SourceStatusHistory,
1236                    &mut self.write_handle,
1237                    source_status_history_desc(&parameters),
1238                    self.now.clone(),
1239                    &storage_collections,
1240                )
1241                .await;
1242
1243                let status_col = MZ_SOURCE_STATUS_HISTORY_DESC
1244                    .get_by_name(&ColumnName::from("status"))
1245                    .expect("schema has not changed")
1246                    .0;
1247
1248                last_status_per_id
1249                    .into_iter()
1250                    .map(|(id, row)| {
1251                        (
1252                            id,
1253                            Status::from_str(
1254                                row.iter()
1255                                    .nth(status_col)
1256                                    .expect("schema has not changed")
1257                                    .unwrap_str(),
1258                            )
1259                            .expect("statuses must be uncorrupted"),
1260                        )
1261                    })
1262                    .collect()
1263            }
1264            IntrospectionType::SinkStatusHistory => {
1265                let last_status_per_id = partially_truncate_status_history(
1266                    self.id,
1267                    IntrospectionType::SinkStatusHistory,
1268                    &mut self.write_handle,
1269                    sink_status_history_desc(&parameters),
1270                    self.now.clone(),
1271                    &storage_collections,
1272                )
1273                .await;
1274
1275                let status_col = MZ_SINK_STATUS_HISTORY_DESC
1276                    .get_by_name(&ColumnName::from("status"))
1277                    .expect("schema has not changed")
1278                    .0;
1279
1280                last_status_per_id
1281                    .into_iter()
1282                    .map(|(id, row)| {
1283                        (
1284                            id,
1285                            Status::from_str(
1286                                row.iter()
1287                                    .nth(status_col)
1288                                    .expect("schema has not changed")
1289                                    .unwrap_str(),
1290                            )
1291                            .expect("statuses must be uncorrupted"),
1292                        )
1293                    })
1294                    .collect()
1295            }
1296
1297            introspection_type @ IntrospectionType::ShardMapping
1298            | introspection_type @ IntrospectionType::Frontiers
1299            | introspection_type @ IntrospectionType::ReplicaFrontiers
1300            | introspection_type @ IntrospectionType::StorageSourceStatistics
1301            | introspection_type @ IntrospectionType::StorageSinkStatistics
1302            | introspection_type @ IntrospectionType::ComputeDependencies
1303            | introspection_type @ IntrospectionType::ComputeOperatorHydrationStatus
1304            | introspection_type @ IntrospectionType::ComputeMaterializedViewRefreshes
1305            | introspection_type @ IntrospectionType::ComputeErrorCounts
1306            | introspection_type @ IntrospectionType::ComputeHydrationTimes => {
1307                unreachable!("not append-only collection: {introspection_type:?}")
1308            }
1309        };
1310        if let Some(previous_statuses) = &mut self.previous_statuses {
1311            previous_statuses.extend(initial_statuses);
1312        }
1313    }
1314
1315    async fn run(mut self) {
1316        let mut interval = tokio::time::interval(Duration::from_millis(DEFAULT_TICK_MS));
1317
1318        const BATCH_SIZE: usize = 4096;
1319        let mut batch: Vec<(Vec<_>, _)> = Vec::with_capacity(BATCH_SIZE);
1320
1321        'run: loop {
1322            tokio::select! {
1323                // Prefer sending actual updates over just bumping the upper, because sending
1324                // updates also bump the upper.
1325                biased;
1326
1327                // Listen for a shutdown signal so we can gracefully cleanup.
1328                _ = &mut self.shutdown_rx => {
1329                    let mut senders = Vec::new();
1330
1331                    // Prevent new messages from being sent.
1332                    self.rx.close();
1333
1334                    // Get as many waiting senders as possible.
1335                    while let Ok((_batch, sender)) = self.rx.try_recv() {
1336                        senders.push(sender);
1337                    }
1338
1339                    // Notify them that this collection is closed.
1340                    //
1341                    // Note: if a task is shutting down, that indicates the source has been
1342                    // dropped, at which point the identifier is invalid. Returning this
1343                    // error provides a better user experience.
1344                    notify_listeners(senders, || Err(StorageError::IdentifierInvalid(self.id)));
1345
1346                    break 'run;
1347                }
1348
1349                // Pull a chunk of queued updates off the channel.
1350                count = self.rx.recv_many(&mut batch, BATCH_SIZE) => {
1351                    if count > 0 {
1352                        // To rate limit appends to persist we add artificial latency, and will
1353                        // finish no sooner than this instant.
1354                        let batch_duration_ms = match self.id {
1355                            GlobalId::User(_) => Duration::from_millis(
1356                                self.user_batch_duration_ms.load(Ordering::Relaxed),
1357                            ),
1358                            // For non-user collections, always just use the default.
1359                            _ => STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT,
1360                        };
1361                        let use_batch_now = Instant::now();
1362                        let min_time_to_complete = use_batch_now + batch_duration_ms;
1363
1364                        tracing::debug!(
1365                            ?use_batch_now,
1366                            ?batch_duration_ms,
1367                            ?min_time_to_complete,
1368                            "batch duration",
1369                        );
1370
1371                        // Reset the interval which is used to periodically bump the uppers
1372                        // because the uppers will get bumped with the following update. This
1373                        // makes it such that we will write at most once every `interval`.
1374                        //
1375                        // For example, let's say our `DEFAULT_TICK` interval is 10, so at
1376                        // `t + 10`, `t + 20`, ... we'll bump the uppers. If we receive an
1377                        // update at `t + 3` we want to shift this window so we bump the uppers
1378                        // at `t + 13`, `t + 23`, ... which resetting the interval accomplishes.
1379                        interval.reset();
1380
1381
1382                        let capacity: usize = batch
1383                            .iter()
1384                            .map(|(rows, _)| rows.len())
1385                            .sum();
1386                        let mut all_rows = Vec::with_capacity(capacity);
1387                        let mut responders = Vec::with_capacity(batch.len());
1388
1389                        for (updates, responder) in batch.drain(..) {
1390                            let rows = self.process_updates(updates);
1391
1392                            all_rows.extend(
1393                                rows.map(|(row, diff)| TimestamplessUpdate { row, diff }),
1394                            );
1395                            responders.push(responder);
1396                        }
1397
1398                        if self.read_only {
1399                            tracing::warn!(%self.id, ?all_rows, "append while in read-only mode");
1400                            notify_listeners(responders, || Err(StorageError::ReadOnly));
1401                            continue;
1402                        }
1403
1404                        // Append updates to persist!
1405                        let at_least = T::from((self.now)());
1406
1407                        if !all_rows.is_empty() {
1408                            monotonic_append(&mut self.write_handle, all_rows, at_least).await;
1409                        }
1410                        // Notify all of our listeners.
1411                        notify_listeners(responders, || Ok(()));
1412
1413                        // Wait until our artificial latency has completed.
1414                        //
1415                        // Note: if writing to persist took longer than `DEFAULT_TICK` this
1416                        // await will resolve immediately.
1417                        tokio::time::sleep_until(min_time_to_complete).await;
1418                    } else {
1419                        // Sender has been dropped, which means the collection should have been
1420                        // unregistered, break out of the run loop if we weren't already
1421                        // aborted.
1422                        break 'run;
1423                    }
1424                }
1425
1426                // If we haven't received any updates, then we'll move the upper forward.
1427                _ = interval.tick() => {
1428                    if self.read_only {
1429                        // Not bumping uppers while in read-only mode.
1430                        continue;
1431                    }
1432
1433                    // Update our collection.
1434                    let now = T::from((self.now)());
1435                    let updates = vec![];
1436                    let at_least = now.clone();
1437
1438                    // Failures don't matter when advancing collections' uppers. This might
1439                    // fail when a clusterd happens to be writing to this concurrently.
1440                    // Advancing uppers here is best-effort and only needs to succeed if no
1441                    // one else is advancing it; contention proves otherwise.
1442                    monotonic_append(&mut self.write_handle, updates, at_least).await;
1443                },
1444            }
1445        }
1446
1447        info!("write_task-{} ending", self.id);
1448    }
1449
1450    /// Deduplicate any [`mz_storage_client::client::StatusUpdate`] within `updates` and converts
1451    /// `updates` to rows and diffs.
1452    fn process_updates(
1453        &mut self,
1454        updates: Vec<AppendOnlyUpdate>,
1455    ) -> impl Iterator<Item = (Row, Diff)> {
1456        let updates = if let Some(previous_statuses) = &mut self.previous_statuses {
1457            let new: Vec<_> = updates
1458                .into_iter()
1459                .filter(|r| match r {
1460                    AppendOnlyUpdate::Row(_) => true,
1461                    AppendOnlyUpdate::Status(update) => {
1462                        match (
1463                            previous_statuses
1464                                .get(&(update.id, update.replica_id))
1465                                .as_deref(),
1466                            &update.status,
1467                        ) {
1468                            (None, _) => true,
1469                            (Some(old), new) => old.superseded_by(*new),
1470                        }
1471                    }
1472                })
1473                .collect();
1474            previous_statuses.extend(new.iter().filter_map(|update| match update {
1475                AppendOnlyUpdate::Row(_) => None,
1476                AppendOnlyUpdate::Status(update) => {
1477                    Some(((update.id, update.replica_id), update.status))
1478                }
1479            }));
1480            new
1481        } else {
1482            updates
1483        };
1484
1485        updates.into_iter().map(AppendOnlyUpdate::into_row)
1486    }
1487}
1488
1489/// Truncates the given metrics history by removing all entries older than that history's
1490/// configured retention interval.
1491///
1492/// # Panics
1493///
1494/// Panics if `collection` is not a metrics history.
1495async fn partially_truncate_metrics_history<T>(
1496    id: GlobalId,
1497    introspection_type: IntrospectionType,
1498    write_handle: &mut WriteHandle<SourceData, (), T, StorageDiff>,
1499    config_set: Arc<ConfigSet>,
1500    now: NowFn,
1501    storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
1502) -> Result<(), anyhow::Error>
1503where
1504    T: Codec64 + From<EpochMillis> + TimestampManipulation,
1505{
1506    let (keep_duration, occurred_at_col) = match introspection_type {
1507        IntrospectionType::ReplicaMetricsHistory => (
1508            REPLICA_METRICS_HISTORY_RETENTION_INTERVAL.get(&config_set),
1509            REPLICA_METRICS_HISTORY_DESC
1510                .get_by_name(&ColumnName::from("occurred_at"))
1511                .expect("schema has not changed")
1512                .0,
1513        ),
1514        IntrospectionType::WallclockLagHistory => (
1515            WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL.get(&config_set),
1516            WALLCLOCK_LAG_HISTORY_DESC
1517                .get_by_name(&ColumnName::from("occurred_at"))
1518                .expect("schema has not changed")
1519                .0,
1520        ),
1521        IntrospectionType::WallclockLagHistogram => (
1522            WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL.get(&config_set),
1523            WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC
1524                .get_by_name(&ColumnName::from("period_start"))
1525                .expect("schema has not changed")
1526                .0,
1527        ),
1528        _ => panic!("not a metrics history: {introspection_type:?}"),
1529    };
1530
1531    let upper = write_handle.fetch_recent_upper().await;
1532    let Some(upper_ts) = upper.as_option() else {
1533        bail!("collection is sealed");
1534    };
1535    let Some(as_of_ts) = upper_ts.step_back() else {
1536        return Ok(()); // nothing to truncate
1537    };
1538
1539    let mut rows = storage_collections
1540        .snapshot_cursor(id, as_of_ts)
1541        .await
1542        .map_err(|e| anyhow!("reading snapshot: {e:?}"))?;
1543
1544    let now = mz_ore::now::to_datetime(now());
1545    let keep_since = now - keep_duration;
1546
1547    // It is very important that we append our retractions at the timestamp
1548    // right after the timestamp at which we got our snapshot. Otherwise,
1549    // it's possible for someone else to sneak in retractions or other
1550    // unexpected changes.
1551    let old_upper_ts = upper_ts.clone();
1552    let new_upper_ts = TimestampManipulation::step_forward(&old_upper_ts);
1553
1554    // Produce retractions by inverting diffs of rows we want to delete.
1555    let mut builder = write_handle.builder(Antichain::from_elem(old_upper_ts.clone()));
1556    while let Some(chunk) = rows.next().await {
1557        for (data, _t, diff) in chunk {
1558            let Ok(row) = &data.0 else { continue };
1559            let datums = row.unpack();
1560            let occurred_at = datums[occurred_at_col].unwrap_timestamptz();
1561            if *occurred_at >= keep_since {
1562                continue;
1563            }
1564            let diff = -diff;
1565            match builder.add(&data, &(), &old_upper_ts, &diff).await? {
1566                Added::Record => {}
1567                Added::RecordAndParts => {
1568                    debug!(?id, "added part to builder");
1569                }
1570            }
1571        }
1572    }
1573
1574    let mut updates = builder
1575        .finish(Antichain::from_elem(new_upper_ts.clone()))
1576        .await?;
1577    let mut batches = vec![&mut updates];
1578
1579    write_handle
1580        .compare_and_append_batch(
1581            batches.as_mut_slice(),
1582            Antichain::from_elem(old_upper_ts),
1583            Antichain::from_elem(new_upper_ts),
1584            true,
1585        )
1586        .await
1587        .expect("valid usage")
1588        .map_err(|e| anyhow!("appending retractions: {e:?}"))
1589}
1590
1591/// Effectively truncates the status history shard based on its retention policy.
1592///
1593/// NOTE: The history collections are really append-only collections, but
1594/// every-now-and-then we want to retract old updates so that the collection
1595/// does not grow unboundedly. Crucially, these are _not_ incremental
1596/// collections, they are not derived from a state at some time `t` and we
1597/// cannot maintain a desired state for them.
1598///
1599/// Returns a map with latest unpacked row per key.
1600pub(crate) async fn partially_truncate_status_history<T, K>(
1601    id: GlobalId,
1602    introspection_type: IntrospectionType,
1603    write_handle: &mut WriteHandle<SourceData, (), T, StorageDiff>,
1604    status_history_desc: StatusHistoryDesc<K>,
1605    now: NowFn,
1606    storage_collections: &Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
1607) -> BTreeMap<K, Row>
1608where
1609    T: Codec64 + From<EpochMillis> + TimestampManipulation,
1610    K: Clone + Debug + Ord + Send + Sync,
1611{
1612    let upper = write_handle.fetch_recent_upper().await.clone();
1613
1614    let mut rows = match upper.as_option() {
1615        Some(f) if f > &T::minimum() => {
1616            let as_of = f.step_back().unwrap();
1617
1618            storage_collections
1619                .snapshot_cursor(id, as_of)
1620                .await
1621                .expect("snapshot succeeds")
1622        }
1623        // If collection is closed or the frontier is the minimum, we cannot
1624        // or don't need to truncate (respectively).
1625        _ => return BTreeMap::new(),
1626    };
1627
1628    // BTreeMap to keep track of the row with the latest timestamp for each key.
1629    let mut latest_row_per_key: BTreeMap<K, (CheckedTimestamp<DateTime<Utc>>, Row)> =
1630        BTreeMap::new();
1631
1632    // It is very important that we append our retractions at the timestamp
1633    // right after the timestamp at which we got our snapshot. Otherwise,
1634    // it's possible for someone else to sneak in retractions or other
1635    // unexpected changes.
1636    let expected_upper = upper.into_option().expect("checked above");
1637    let new_upper = TimestampManipulation::step_forward(&expected_upper);
1638
1639    let mut deletions = write_handle.builder(Antichain::from_elem(expected_upper.clone()));
1640
1641    let mut handle_row = {
1642        let latest_row_per_key = &mut latest_row_per_key;
1643        move |row: &Row, diff| {
1644            let datums = row.unpack();
1645            let key = (status_history_desc.extract_key)(&datums);
1646            let timestamp = (status_history_desc.extract_time)(&datums);
1647
1648            assert!(
1649                diff > 0,
1650                "only know how to operate over consolidated data with diffs > 0, \
1651                    found diff {diff} for object {key:?} in {introspection_type:?}",
1652            );
1653
1654            // Keep track of the timestamp of the latest row per key.
1655            match latest_row_per_key.get(&key) {
1656                Some(existing) if &existing.0 > &timestamp => {}
1657                _ => {
1658                    latest_row_per_key.insert(key.clone(), (timestamp, row.clone()));
1659                }
1660            };
1661            (key, timestamp)
1662        }
1663    };
1664
1665    match status_history_desc.retention_policy {
1666        StatusHistoryRetentionPolicy::LastN(n) => {
1667            // BTreeMap to track the earliest events for each key.
1668            let mut last_n_entries_per_key: BTreeMap<
1669                K,
1670                BinaryHeap<Reverse<(CheckedTimestamp<DateTime<Utc>>, Row)>>,
1671            > = BTreeMap::new();
1672
1673            while let Some(chunk) = rows.next().await {
1674                for (data, _t, diff) in chunk {
1675                    let Ok(row) = &data.0 else { continue };
1676                    let (key, timestamp) = handle_row(row, diff);
1677
1678                    // Duplicate rows ARE possible if many status changes happen in VERY quick succession,
1679                    // so we handle duplicated rows separately.
1680                    let entries = last_n_entries_per_key.entry(key).or_default();
1681                    for _ in 0..diff {
1682                        // We CAN have multiple statuses (most likely Starting and Running) at the exact same
1683                        // millisecond, depending on how the `health_operator` is scheduled.
1684                        //
1685                        // Note that these will be arbitrarily ordered, so a Starting event might
1686                        // survive and a Running one won't. The next restart will remove the other,
1687                        // so we don't bother being careful about it.
1688                        //
1689                        // TODO(guswynn): unpack these into health-status objects and use
1690                        // their `Ord` impl.
1691                        entries.push(Reverse((timestamp, row.clone())));
1692
1693                        // Retain some number of entries, using pop to mark the oldest entries for
1694                        // deletion.
1695                        while entries.len() > n {
1696                            if let Some(Reverse((_, r))) = entries.pop() {
1697                                deletions
1698                                    .add(&SourceData(Ok(r)), &(), &expected_upper, &-1)
1699                                    .await
1700                                    .expect("usage should be valid");
1701                            }
1702                        }
1703                    }
1704                }
1705            }
1706        }
1707        StatusHistoryRetentionPolicy::TimeWindow(time_window) => {
1708            // Get the lower bound of our retention window
1709            let now = mz_ore::now::to_datetime(now());
1710            let keep_since = now - time_window;
1711
1712            // Mark any row outside the retention window for deletion
1713            while let Some(chunk) = rows.next().await {
1714                for (data, _t, diff) in chunk {
1715                    let Ok(row) = &data.0 else { continue };
1716                    let (_, timestamp) = handle_row(row, diff);
1717
1718                    if *timestamp < keep_since {
1719                        deletions
1720                            .add(&data, &(), &expected_upper, &-1)
1721                            .await
1722                            .expect("usage should be valid");
1723                    }
1724                }
1725            }
1726        }
1727    }
1728
1729    let mut updates = deletions
1730        .finish(Antichain::from_elem(new_upper.clone()))
1731        .await
1732        .expect("expected valid usage");
1733    let mut batches = vec![&mut updates];
1734
1735    // Updates are only deletes because everything else is already in the shard.\
1736    let res = write_handle
1737        .compare_and_append_batch(
1738            batches.as_mut_slice(),
1739            Antichain::from_elem(expected_upper.clone()),
1740            Antichain::from_elem(new_upper),
1741            true,
1742        )
1743        .await
1744        .expect("usage was valid");
1745
1746    match res {
1747        Ok(_) => {
1748            // All good, yay!
1749        }
1750        Err(err) => {
1751            // This is fine, it just means the upper moved because
1752            // of continual upper advancement or because someone
1753            // already appended some more retractions/updates.
1754            //
1755            // NOTE: We might want to attempt these partial
1756            // retractions on an interval, instead of only when
1757            // starting up!
1758            info!(
1759                %id, ?expected_upper, current_upper = ?err.current,
1760                "failed to append partial truncation",
1761            );
1762        }
1763    }
1764
1765    latest_row_per_key
1766        .into_iter()
1767        .map(|(key, (_, row))| (key, row))
1768        .collect()
1769}
1770
1771async fn monotonic_append<T: Timestamp + Lattice + Codec64 + TimestampManipulation>(
1772    write_handle: &mut WriteHandle<SourceData, (), T, StorageDiff>,
1773    updates: Vec<TimestamplessUpdate>,
1774    at_least: T,
1775) {
1776    let mut expected_upper = write_handle.shared_upper();
1777    loop {
1778        if updates.is_empty() && expected_upper.is_empty() {
1779            // Ignore timestamp advancement for
1780            // closed collections. TODO? Make this a
1781            // correctable error
1782            return;
1783        }
1784
1785        let upper = expected_upper
1786            .into_option()
1787            .expect("cannot append data to closed collection");
1788
1789        let lower = if upper.less_than(&at_least) {
1790            at_least.clone()
1791        } else {
1792            upper.clone()
1793        };
1794
1795        let new_upper = TimestampManipulation::step_forward(&lower);
1796        let updates = updates
1797            .iter()
1798            .map(|TimestamplessUpdate { row, diff }| {
1799                (
1800                    (SourceData(Ok(row.clone())), ()),
1801                    lower.clone(),
1802                    diff.into_inner(),
1803                )
1804            })
1805            .collect::<Vec<_>>();
1806        let res = write_handle
1807            .compare_and_append(
1808                updates,
1809                Antichain::from_elem(upper),
1810                Antichain::from_elem(new_upper),
1811            )
1812            .await
1813            .expect("valid usage");
1814        match res {
1815            Ok(()) => return,
1816            Err(err) => {
1817                expected_upper = err.current;
1818                continue;
1819            }
1820        }
1821    }
1822}
1823
1824// Helper method for notifying listeners.
1825fn notify_listeners<T>(
1826    responders: impl IntoIterator<Item = oneshot::Sender<T>>,
1827    result: impl Fn() -> T,
1828) {
1829    for r in responders {
1830        // We don't care if the listener disappeared.
1831        let _ = r.send(result());
1832    }
1833}
1834
1835#[cfg(test)]
1836mod tests {
1837    use std::collections::BTreeSet;
1838
1839    use super::*;
1840    use itertools::Itertools;
1841    use mz_repr::{Datum, Row};
1842    use mz_storage_client::client::StatusUpdate;
1843    use mz_storage_client::healthcheck::{
1844        MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC,
1845    };
1846
1847    #[mz_ore::test]
1848    fn test_row() {
1849        let error_message = "error message";
1850        let hint = "hint message";
1851        let id = GlobalId::User(1);
1852        let status = Status::Dropped;
1853        let row = Row::from(StatusUpdate {
1854            id,
1855            timestamp: chrono::offset::Utc::now(),
1856            status,
1857            error: Some(error_message.to_string()),
1858            hints: BTreeSet::from([hint.to_string()]),
1859            namespaced_errors: Default::default(),
1860            replica_id: None,
1861        });
1862
1863        for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1864            assert!(datum.is_instance_of_sql(column_type));
1865        }
1866
1867        for (datum, column_type) in row
1868            .iter()
1869            .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
1870        {
1871            assert!(datum.is_instance_of_sql(column_type));
1872        }
1873
1874        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1875        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1876        assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
1877
1878        let details = row
1879            .iter()
1880            .nth(4)
1881            .unwrap()
1882            .unwrap_map()
1883            .iter()
1884            .collect::<Vec<_>>();
1885
1886        assert_eq!(details.len(), 1);
1887        let hint_datum = &details[0];
1888
1889        assert_eq!(hint_datum.0, "hints");
1890        assert_eq!(
1891            hint_datum.1.unwrap_list().iter().next().unwrap(),
1892            Datum::String(hint)
1893        );
1894    }
1895
1896    #[mz_ore::test]
1897    fn test_row_without_hint() {
1898        let error_message = "error message";
1899        let id = GlobalId::User(1);
1900        let status = Status::Dropped;
1901        let row = Row::from(StatusUpdate {
1902            id,
1903            timestamp: chrono::offset::Utc::now(),
1904            status,
1905            error: Some(error_message.to_string()),
1906            hints: Default::default(),
1907            namespaced_errors: Default::default(),
1908            replica_id: None,
1909        });
1910
1911        for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1912            assert!(datum.is_instance_of_sql(column_type));
1913        }
1914
1915        for (datum, column_type) in row
1916            .iter()
1917            .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
1918        {
1919            assert!(datum.is_instance_of_sql(column_type));
1920        }
1921
1922        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1923        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1924        assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
1925        assert_eq!(row.iter().nth(4).unwrap(), Datum::Null);
1926    }
1927
1928    #[mz_ore::test]
1929    fn test_row_without_error() {
1930        let id = GlobalId::User(1);
1931        let status = Status::Dropped;
1932        let hint = "hint message";
1933        let row = Row::from(StatusUpdate {
1934            id,
1935            timestamp: chrono::offset::Utc::now(),
1936            status,
1937            error: None,
1938            hints: BTreeSet::from([hint.to_string()]),
1939            namespaced_errors: Default::default(),
1940            replica_id: None,
1941        });
1942
1943        for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1944            assert!(datum.is_instance_of_sql(column_type));
1945        }
1946
1947        for (datum, column_type) in row
1948            .iter()
1949            .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
1950        {
1951            assert!(datum.is_instance_of_sql(column_type));
1952        }
1953
1954        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1955        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1956        assert_eq!(row.iter().nth(3).unwrap(), Datum::Null);
1957
1958        let details = row
1959            .iter()
1960            .nth(4)
1961            .unwrap()
1962            .unwrap_map()
1963            .iter()
1964            .collect::<Vec<_>>();
1965
1966        assert_eq!(details.len(), 1);
1967        let hint_datum = &details[0];
1968
1969        assert_eq!(hint_datum.0, "hints");
1970        assert_eq!(
1971            hint_datum.1.unwrap_list().iter().next().unwrap(),
1972            Datum::String(hint)
1973        );
1974    }
1975
1976    #[mz_ore::test]
1977    fn test_row_with_namespaced() {
1978        let error_message = "error message";
1979        let id = GlobalId::User(1);
1980        let status = Status::Dropped;
1981        let row = Row::from(StatusUpdate {
1982            id,
1983            timestamp: chrono::offset::Utc::now(),
1984            status,
1985            error: Some(error_message.to_string()),
1986            hints: Default::default(),
1987            namespaced_errors: BTreeMap::from([("thing".to_string(), "error".to_string())]),
1988            replica_id: None,
1989        });
1990
1991        for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1992            assert!(datum.is_instance_of_sql(column_type));
1993        }
1994
1995        for (datum, column_type) in row
1996            .iter()
1997            .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
1998        {
1999            assert!(datum.is_instance_of_sql(column_type));
2000        }
2001
2002        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
2003        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
2004        assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
2005
2006        let details = row
2007            .iter()
2008            .nth(4)
2009            .unwrap()
2010            .unwrap_map()
2011            .iter()
2012            .collect::<Vec<_>>();
2013
2014        assert_eq!(details.len(), 1);
2015        let ns_datum = &details[0];
2016
2017        assert_eq!(ns_datum.0, "namespaced");
2018        assert_eq!(
2019            ns_datum.1.unwrap_map().iter().next().unwrap(),
2020            ("thing", Datum::String("error"))
2021        );
2022    }
2023
2024    #[mz_ore::test]
2025    fn test_row_with_everything() {
2026        let error_message = "error message";
2027        let hint = "hint message";
2028        let id = GlobalId::User(1);
2029        let status = Status::Dropped;
2030        let row = Row::from(StatusUpdate {
2031            id,
2032            timestamp: chrono::offset::Utc::now(),
2033            status,
2034            error: Some(error_message.to_string()),
2035            hints: BTreeSet::from([hint.to_string()]),
2036            namespaced_errors: BTreeMap::from([("thing".to_string(), "error".to_string())]),
2037            replica_id: None,
2038        });
2039
2040        for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
2041            assert!(datum.is_instance_of_sql(column_type));
2042        }
2043
2044        for (datum, column_type) in row
2045            .iter()
2046            .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
2047        {
2048            assert!(datum.is_instance_of_sql(column_type));
2049        }
2050
2051        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
2052        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
2053        assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
2054
2055        let details = row
2056            .iter()
2057            .nth(4)
2058            .unwrap()
2059            .unwrap_map()
2060            .iter()
2061            .collect::<Vec<_>>();
2062
2063        assert_eq!(details.len(), 2);
2064        // These are always sorted
2065        let hint_datum = &details[0];
2066        let ns_datum = &details[1];
2067
2068        assert_eq!(hint_datum.0, "hints");
2069        assert_eq!(
2070            hint_datum.1.unwrap_list().iter().next().unwrap(),
2071            Datum::String(hint)
2072        );
2073
2074        assert_eq!(ns_datum.0, "namespaced");
2075        assert_eq!(
2076            ns_datum.1.unwrap_map().iter().next().unwrap(),
2077            ("thing", Datum::String("error"))
2078        );
2079    }
2080}