mz_storage_controller/
collection_mgmt.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Tokio tasks (and support machinery) for maintaining storage-managed
11//! collections.
12//!
13//! We differentiate between append-only collections and differential
14//! collections. The intent is that knowing the type allows being more
15//! intentional about what state we keep in memory and how we work when in
16//! read-only mode / during zero-downtime upgrades.
17//!
18//! ## Append-only collections
19//!
20//! Writers only append blind writes. Those writes never fail. It does not
21//! matter at what timestamp they happen (to a degree, but ...).
22//!
23//! While in read-only mode, the append-only write task can immediately write
24//! updates out as batches, but only append them when going out of read-only
25//! mode.
26//!
27//! ## Differential collections
28//!
29//! These are very similar to the self-correcting persist_sink. We have an
30//! in-memory desired state and continually make it so that persist matches
31//! desired. As described below (in the task implementation), we could do this
32//! in a memory efficient way by keeping open a persist read handle and
33//! continually updating/consolidating our desired collection. This way, we
34//! would be memory-efficient even in read-only mode.
35//!
36//! This is an evolution of the current design where, on startup, we bring the
37//! persist collection into a known state (mostly by retracting everything) and
38//! then assume that this `envd` is the only writer. We panic when that is ever
39//! not the case, which we notice when the upper of a collection changes
40//! unexpectedly. With this new design we can instead continually update our
41//! view of the persist shard and emit updates when needed, when desired
42//! changed.
43//!
44//! NOTE: As it is, we always keep all of desired in memory. Only when told to
45//! go out of read-only mode would we start attempting to write.
46//!
47//! ## Read-only mode
48//!
49//! When [`CollectionManager`] is in read-only mode it cannot write out to
50//! persist. It will, however, maintain the `desired` state of differential
51//! collections so that we can immediately start writing out updates when going
52//! out of read-only mode.
53//!
54//! For append-only collections we either panic, in the case of
55//! [`CollectionManager::blind_write`], or report back a
56//! [`StorageError::ReadOnly`] when trying to append through a
57//! [`MonotonicAppender`] returned from
58//! [`CollectionManager::monotonic_appender`].
59
60use std::any::Any;
61use std::cmp::Reverse;
62use std::collections::{BTreeMap, BinaryHeap};
63use std::fmt::Debug;
64use std::ops::ControlFlow;
65use std::pin::Pin;
66use std::str::FromStr;
67use std::sync::atomic::{AtomicU64, Ordering};
68use std::sync::{Arc, Mutex};
69
70use anyhow::{anyhow, bail};
71use chrono::{DateTime, Utc};
72use differential_dataflow::consolidation;
73use differential_dataflow::lattice::Lattice;
74use futures::future::BoxFuture;
75use futures::stream::StreamExt;
76use futures::{Future, FutureExt};
77use mz_cluster_client::ReplicaId;
78use mz_dyncfg::ConfigSet;
79use mz_ore::now::{EpochMillis, NowFn};
80use mz_ore::retry::Retry;
81use mz_ore::soft_panic_or_log;
82use mz_ore::task::AbortOnDropHandle;
83use mz_ore::vec::VecExt;
84use mz_persist_client::read::ReadHandle;
85use mz_persist_client::write::WriteHandle;
86use mz_persist_types::Codec64;
87use mz_repr::adt::timestamp::CheckedTimestamp;
88use mz_repr::{ColumnName, Diff, GlobalId, Row, TimestampManipulation};
89use mz_storage_client::client::{AppendOnlyUpdate, Status, TimestamplessUpdate};
90use mz_storage_client::controller::{IntrospectionType, MonotonicAppender, StorageWriteOp};
91use mz_storage_client::healthcheck::{
92    MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_METRICS_HISTORY_DESC,
93    WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC, WALLCLOCK_LAG_HISTORY_DESC,
94};
95use mz_storage_client::metrics::StorageControllerMetrics;
96use mz_storage_client::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate};
97use mz_storage_client::storage_collections::StorageCollections;
98use mz_storage_types::StorageDiff;
99use mz_storage_types::controller::InvalidUpper;
100use mz_storage_types::dyncfgs::{
101    REPLICA_METRICS_HISTORY_RETENTION_INTERVAL, WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL,
102    WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL,
103};
104use mz_storage_types::parameters::{
105    STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT, StorageParameters,
106};
107use mz_storage_types::sources::SourceData;
108use timely::progress::{Antichain, Timestamp};
109use tokio::sync::{mpsc, oneshot, watch};
110use tokio::time::{Duration, Instant};
111use tracing::{debug, error, info};
112
113use crate::{
114    StatusHistoryDesc, StatusHistoryRetentionPolicy, StorageError, collection_mgmt,
115    privatelink_status_history_desc, replica_status_history_desc, sink_status_history_desc,
116    snapshot_statistics, source_status_history_desc, statistics,
117};
118
119// Default rate at which we advance the uppers of managed collections.
120const DEFAULT_TICK_MS: u64 = 1_000;
121
122/// A channel for sending writes to a differential collection.
123type DifferentialWriteChannel<T> =
124    mpsc::UnboundedSender<(StorageWriteOp, oneshot::Sender<Result<(), StorageError<T>>>)>;
125
126/// A channel for sending writes to an append-only collection.
127type AppendOnlyWriteChannel<T> = mpsc::UnboundedSender<(
128    Vec<AppendOnlyUpdate>,
129    oneshot::Sender<Result<(), StorageError<T>>>,
130)>;
131
132type WriteTask = AbortOnDropHandle<()>;
133type ShutdownSender = oneshot::Sender<()>;
134
135/// Types of storage-managed/introspection collections:
136///
137/// Append-only: Only accepts blind writes, writes that can be applied at any
138/// timestamp and don’t depend on current collection contents.
139///
140/// Pseudo append-only: We treat them largely as append-only collections but
141/// periodically (currently on bootstrap) retract old updates from them.
142///
143/// Differential: at any given time `t` , collection contents mirrors some
144/// (small cardinality) state. The cardinality of the collection stays constant
145/// if the thing that is mirrored doesn’t change in cardinality. At steady
146/// state, updates always come in pairs of retractions/additions.
147pub enum CollectionManagerKind {
148    AppendOnly,
149    Differential,
150}
151
152#[derive(Debug, Clone)]
153pub struct CollectionManager<T>
154where
155    T: Timestamp + Lattice + Codec64 + TimestampManipulation,
156{
157    /// When a [`CollectionManager`] is in read-only mode it must not affect any
158    /// changes to external state.
159    read_only: bool,
160
161    // WIP: Name TBD! I thought about `managed_collections`, `ivm_collections`,
162    // `self_correcting_collections`.
163    /// These are collections that we write to by adding/removing updates to an
164    /// internal _desired_ collection. The `CollectionManager` continually makes
165    /// sure that collection contents (in persist) match the desired state.
166    differential_collections:
167        Arc<Mutex<BTreeMap<GlobalId, (DifferentialWriteChannel<T>, WriteTask, ShutdownSender)>>>,
168
169    /// Collections that we only append to using blind-writes.
170    ///
171    /// Every write succeeds at _some_ timestamp, and we never check what the
172    /// actual contents of the collection (in persist) are.
173    append_only_collections:
174        Arc<Mutex<BTreeMap<GlobalId, (AppendOnlyWriteChannel<T>, WriteTask, ShutdownSender)>>>,
175
176    /// Amount of time we'll wait before sending a batch of inserts to Persist, for user
177    /// collections.
178    user_batch_duration_ms: Arc<AtomicU64>,
179    now: NowFn,
180}
181
182/// The `CollectionManager` provides two complementary functions:
183/// - Providing an API to append values to a registered set of collections.
184///   For this usecase:
185///     - The `CollectionManager` expects to be the only writer.
186///     - Appending to a closed collection panics
187/// - Automatically advancing the timestamp of managed collections every
188///   second. For this usecase:
189///     - The `CollectionManager` handles contention by permitting and ignoring errors.
190///     - Closed collections will not panic if they continue receiving these requests.
191impl<T> CollectionManager<T>
192where
193    T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
194{
195    pub(super) fn new(read_only: bool, now: NowFn) -> CollectionManager<T> {
196        let batch_duration_ms: u64 = STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT
197            .as_millis()
198            .try_into()
199            .expect("known to fit");
200
201        CollectionManager {
202            read_only,
203            differential_collections: Arc::new(Mutex::new(BTreeMap::new())),
204            append_only_collections: Arc::new(Mutex::new(BTreeMap::new())),
205            user_batch_duration_ms: Arc::new(AtomicU64::new(batch_duration_ms)),
206            now,
207        }
208    }
209
210    /// Updates the duration we'll wait to batch events for user owned collections.
211    pub fn update_user_batch_duration(&self, duration: Duration) {
212        tracing::info!(?duration, "updating user batch duration");
213        let millis: u64 = duration.as_millis().try_into().unwrap_or(u64::MAX);
214        self.user_batch_duration_ms.store(millis, Ordering::Relaxed);
215    }
216
217    /// Registers a new _differential collection_.
218    ///
219    /// The [CollectionManager] will automatically advance the upper of every
220    /// registered collection every second.
221    ///
222    /// Update the `desired` state of a differential collection using
223    /// [Self::differential_write].
224    pub(super) fn register_differential_collection<R>(
225        &self,
226        id: GlobalId,
227        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
228        read_handle_fn: R,
229        force_writable: bool,
230        introspection_config: DifferentialIntrospectionConfig<T>,
231    ) where
232        R: FnMut() -> Pin<
233                Box<dyn Future<Output = ReadHandle<SourceData, (), T, StorageDiff>> + Send>,
234            > + Send
235            + Sync
236            + 'static,
237    {
238        let mut guard = self
239            .differential_collections
240            .lock()
241            .expect("collection_mgmt panicked");
242
243        // Check if this collection is already registered.
244        if let Some((_writer, task, _shutdown_tx)) = guard.get(&id) {
245            // The collection is already registered and the task is still running so nothing to do.
246            if !task.is_finished() {
247                // TODO(parkmycar): Panic here if we never see this error in production.
248                tracing::error!("Registered a collection twice! {id:?}");
249                return;
250            }
251        }
252
253        let read_only = self.get_read_only(id, force_writable);
254
255        // Spawns a new task so we can write to this collection.
256        let writer_and_handle = DifferentialWriteTask::spawn(
257            id,
258            write_handle,
259            read_handle_fn,
260            read_only,
261            self.now.clone(),
262            introspection_config,
263        );
264        let prev = guard.insert(id, writer_and_handle);
265
266        // Double check the previous task was actually finished.
267        if let Some((_, prev_task, _)) = prev {
268            assert!(
269                prev_task.is_finished(),
270                "should only spawn a new task if the previous is finished"
271            );
272        }
273    }
274
275    /// Registers a new _append-only collection_.
276    ///
277    /// The [CollectionManager] will automatically advance the upper of every
278    /// registered collection every second.
279    pub(super) fn register_append_only_collection(
280        &self,
281        id: GlobalId,
282        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
283        force_writable: bool,
284        introspection_config: Option<AppendOnlyIntrospectionConfig<T>>,
285    ) {
286        let mut guard = self
287            .append_only_collections
288            .lock()
289            .expect("collection_mgmt panicked");
290
291        // Check if this collection is already registered.
292        if let Some((_writer, task, _shutdown_tx)) = guard.get(&id) {
293            // The collection is already registered and the task is still running so nothing to do.
294            if !task.is_finished() {
295                // TODO(parkmycar): Panic here if we never see this error in production.
296                tracing::error!("Registered a collection twice! {id:?}");
297                return;
298            }
299        }
300
301        let read_only = self.get_read_only(id, force_writable);
302
303        // Spawns a new task so we can write to this collection.
304        let writer_and_handle = AppendOnlyWriteTask::spawn(
305            id,
306            write_handle,
307            read_only,
308            self.now.clone(),
309            Arc::clone(&self.user_batch_duration_ms),
310            introspection_config,
311        );
312        let prev = guard.insert(id, writer_and_handle);
313
314        // Double check the previous task was actually finished.
315        if let Some((_, prev_task, _)) = prev {
316            assert!(
317                prev_task.is_finished(),
318                "should only spawn a new task if the previous is finished"
319            );
320        }
321    }
322
323    /// Unregisters the given collection.
324    ///
325    /// Also waits until the `CollectionManager` has completed all outstanding work to ensure that
326    /// it has stopped referencing the provided `id`.
327    #[mz_ore::instrument(level = "debug")]
328    pub(super) fn unregister_collection(&self, id: GlobalId) -> BoxFuture<'static, ()> {
329        let prev = self
330            .differential_collections
331            .lock()
332            .expect("CollectionManager panicked")
333            .remove(&id);
334
335        // Wait for the task to complete before reporting as unregisted.
336        if let Some((_prev_writer, prev_task, shutdown_tx)) = prev {
337            // Notify the task it needs to shutdown.
338            //
339            // We can ignore errors here because they indicate the task is already done.
340            let _ = shutdown_tx.send(());
341            return Box::pin(prev_task.map(|_| ()));
342        }
343
344        let prev = self
345            .append_only_collections
346            .lock()
347            .expect("CollectionManager panicked")
348            .remove(&id);
349
350        // Wait for the task to complete before reporting as unregisted.
351        if let Some((_prev_writer, prev_task, shutdown_tx)) = prev {
352            // Notify the task it needs to shutdown.
353            //
354            // We can ignore errors here because they indicate the task is already done.
355            let _ = shutdown_tx.send(());
356            return Box::pin(prev_task.map(|_| ()));
357        }
358
359        Box::pin(futures::future::ready(()))
360    }
361
362    /// Returns a sender for writes to the given append-only collection.
363    ///
364    /// # Panics
365    /// - If `id` does not belong to an append-only collections.
366    pub(super) fn append_only_write_sender(&self, id: GlobalId) -> AppendOnlyWriteChannel<T> {
367        let collections = self.append_only_collections.lock().expect("poisoned");
368        match collections.get(&id) {
369            Some((tx, _, _)) => tx.clone(),
370            None => panic!("missing append-only collection: {id}"),
371        }
372    }
373
374    /// Returns a sender for writes to the given differential collection.
375    ///
376    /// # Panics
377    /// - If `id` does not belong to a differential collections.
378    pub(super) fn differential_write_sender(&self, id: GlobalId) -> DifferentialWriteChannel<T> {
379        let collections = self.differential_collections.lock().expect("poisoned");
380        match collections.get(&id) {
381            Some((tx, _, _)) => tx.clone(),
382            None => panic!("missing differential collection: {id}"),
383        }
384    }
385
386    /// Appends `updates` to the append-only collection identified by `id`, at
387    /// _some_ timestamp. Does not wait for the append to complete.
388    ///
389    /// # Panics
390    /// - If `id` does not belong to an append-only collections.
391    /// - If this [`CollectionManager`] is in read-only mode.
392    /// - If the collection closed.
393    pub(super) fn blind_write(&self, id: GlobalId, updates: Vec<AppendOnlyUpdate>) {
394        if self.read_only {
395            panic!("attempting blind write to {} while in read-only mode", id);
396        }
397
398        if !updates.is_empty() {
399            let update_tx = self.append_only_write_sender(id);
400            let (tx, _rx) = oneshot::channel();
401            update_tx.send((updates, tx)).expect("rx hung up");
402        }
403    }
404
405    /// Updates the desired collection state of the differential collection identified by
406    /// `id`. The underlying persist shard will reflect this change at
407    /// _some_point. Does not wait for the change to complete.
408    ///
409    /// # Panics
410    /// - If `id` does not belong to a differential collection.
411    /// - If the collection closed.
412    pub(super) fn differential_write(&self, id: GlobalId, op: StorageWriteOp) {
413        if !op.is_empty_append() {
414            let update_tx = self.differential_write_sender(id);
415            let (tx, _rx) = oneshot::channel();
416            update_tx.send((op, tx)).expect("rx hung up");
417        }
418    }
419
420    /// Appends the given `updates` to the differential collection identified by `id`.
421    ///
422    /// # Panics
423    /// - If `id` does not belong to a differential collection.
424    /// - If the collection closed.
425    pub(super) fn differential_append(&self, id: GlobalId, updates: Vec<(Row, Diff)>) {
426        self.differential_write(id, StorageWriteOp::Append { updates })
427    }
428
429    /// Returns a [`MonotonicAppender`] that can be used to monotonically append updates to the
430    /// collection correlated with `id`.
431    pub(super) fn monotonic_appender(
432        &self,
433        id: GlobalId,
434    ) -> Result<MonotonicAppender<T>, StorageError<T>> {
435        let guard = self
436            .append_only_collections
437            .lock()
438            .expect("CollectionManager panicked");
439        let tx = guard
440            .get(&id)
441            .map(|(tx, _, _)| tx.clone())
442            .ok_or(StorageError::IdentifierMissing(id))?;
443
444        Ok(MonotonicAppender::new(tx))
445    }
446
447    fn get_read_only(&self, id: GlobalId, force_writable: bool) -> bool {
448        if force_writable {
449            assert!(id.is_system(), "unexpected non-system global id: {id:?}");
450            false
451        } else {
452            self.read_only
453        }
454    }
455}
456
457pub(crate) struct DifferentialIntrospectionConfig<T>
458where
459    T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
460{
461    pub(crate) recent_upper: Antichain<T>,
462    pub(crate) introspection_type: IntrospectionType,
463    pub(crate) storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
464    pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
465    pub(crate) source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
466    pub(crate) sink_statistics:
467        Arc<Mutex<BTreeMap<GlobalId, statistics::StatsState<SinkStatisticsUpdate>>>>,
468    pub(crate) statistics_interval: Duration,
469    pub(crate) statistics_interval_receiver: watch::Receiver<Duration>,
470    pub(crate) metrics: StorageControllerMetrics,
471    pub(crate) introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
472}
473
474/// A task that will make it so that the state in persist matches the desired
475/// state and continuously bump the upper for the specified collection.
476///
477/// NOTE: This implementation is a bit clunky, and could be optimized by not keeping
478/// all of desired in memory (see commend below). It is meant to showcase the
479/// general approach.
480struct DifferentialWriteTask<T, R>
481where
482    T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
483    R: FnMut() -> Pin<Box<dyn Future<Output = ReadHandle<SourceData, (), T, StorageDiff>> + Send>>
484        + Send
485        + 'static,
486{
487    /// The collection that we are writing to.
488    id: GlobalId,
489
490    write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
491
492    /// For getting a [`ReadHandle`] to sync our state to persist contents.
493    read_handle_fn: R,
494
495    read_only: bool,
496
497    now: NowFn,
498
499    /// In the absence of updates, we regularly bump the upper to "now", on this
500    /// interval. This makes it so the collection remains readable at recent
501    /// timestamps.
502    upper_tick_interval: tokio::time::Interval,
503
504    /// Receiver for write commands. These change our desired state.
505    cmd_rx: mpsc::UnboundedReceiver<(StorageWriteOp, oneshot::Sender<Result<(), StorageError<T>>>)>,
506
507    /// We have to shut down when receiving from this.
508    shutdown_rx: oneshot::Receiver<()>,
509
510    /// The contents of the collection as it should be according to whoever is
511    /// driving us around.
512    // This is memory inefficient: we always keep a full copy of
513    // desired, so that we can re-derive a to_write if/when someone else
514    // writes to persist and we notice because of an upper conflict.
515    // This is optimized for the case where we rarely have more than one
516    // writer.
517    //
518    // We can optimize for a multi-writer case by keeping an open
519    // ReadHandle and continually reading updates from persist, updating
520    // a desired in place. Similar to the self-correcting persist_sink.
521    desired: Vec<(Row, Diff)>,
522
523    /// Updates that we have to write when next writing to persist. This is
524    /// determined by looking at what is desired and what is in persist.
525    to_write: Vec<(Row, Diff)>,
526
527    /// Current upper of the persist shard. We keep track of this so that we
528    /// realize when someone else writes to the shard, in which case we have to
529    /// update our state of the world, that is update our `to_write` based on
530    /// `desired` and the contents of the persist shard.
531    current_upper: T,
532}
533
534impl<T, R> DifferentialWriteTask<T, R>
535where
536    T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
537    R: FnMut() -> Pin<Box<dyn Future<Output = ReadHandle<SourceData, (), T, StorageDiff>> + Send>>
538        + Send
539        + Sync
540        + 'static,
541{
542    /// Spawns a [`DifferentialWriteTask`] in an [`mz_ore::task`] and returns
543    /// handles for interacting with it.
544    fn spawn(
545        id: GlobalId,
546        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
547        read_handle_fn: R,
548        read_only: bool,
549        now: NowFn,
550        introspection_config: DifferentialIntrospectionConfig<T>,
551    ) -> (DifferentialWriteChannel<T>, WriteTask, ShutdownSender) {
552        let (tx, rx) = mpsc::unbounded_channel();
553        let (shutdown_tx, shutdown_rx) = oneshot::channel();
554
555        let upper_tick_interval = tokio::time::interval(Duration::from_millis(DEFAULT_TICK_MS));
556
557        let current_upper = T::minimum();
558
559        let task = Self {
560            id,
561            write_handle,
562            read_handle_fn,
563            read_only,
564            now,
565            upper_tick_interval,
566            cmd_rx: rx,
567            shutdown_rx,
568            desired: Vec::new(),
569            to_write: Vec::new(),
570            current_upper,
571        };
572
573        let handle = mz_ore::task::spawn(
574            || format!("CollectionManager-differential_write_task-{id}"),
575            async move {
576                if !task.read_only {
577                    task.prepare(introspection_config).await;
578                }
579                let res = task.run().await;
580
581                match res {
582                    ControlFlow::Break(reason) => {
583                        info!("write_task-{} ending: {}", id, reason);
584                    }
585                    c => {
586                        unreachable!(
587                            "cannot break out of the loop with a Continue, but got: {:?}",
588                            c
589                        );
590                    }
591                }
592            },
593        );
594
595        (tx, handle.abort_on_drop(), shutdown_tx)
596    }
597
598    /// Does any work that is required before this background task starts
599    /// writing to the given introspection collection.
600    ///
601    /// This might include consolidation, deleting older entries or seeding
602    /// in-memory state of, say, scrapers, with current collection contents.
603    async fn prepare(&self, introspection_config: DifferentialIntrospectionConfig<T>) {
604        tracing::info!(%self.id, ?introspection_config.introspection_type, "preparing differential introspection collection for writes");
605
606        match introspection_config.introspection_type {
607            IntrospectionType::ShardMapping => {
608                // Done by the `append_shard_mappings` call.
609            }
610            IntrospectionType::Frontiers | IntrospectionType::ReplicaFrontiers => {
611                // Differential collections start with an empty
612                // desired state. No need to manually reset.
613            }
614            IntrospectionType::StorageSourceStatistics => {
615                let prev = snapshot_statistics(
616                    self.id,
617                    introspection_config.recent_upper,
618                    &introspection_config.storage_collections,
619                )
620                .await;
621
622                let scraper_token = statistics::spawn_statistics_scraper::<
623                    statistics::SourceStatistics,
624                    SourceStatisticsUpdate,
625                    _,
626                >(
627                    self.id.clone(),
628                    // These do a shallow copy.
629                    introspection_config.collection_manager,
630                    Arc::clone(&introspection_config.source_statistics),
631                    prev,
632                    introspection_config.statistics_interval.clone(),
633                    introspection_config.statistics_interval_receiver.clone(),
634                    introspection_config.metrics,
635                );
636                let web_token = statistics::spawn_webhook_statistics_scraper(
637                    introspection_config.source_statistics,
638                    introspection_config.statistics_interval,
639                    introspection_config.statistics_interval_receiver,
640                );
641
642                // Make sure these are dropped when the controller is
643                // dropped, so that the internal task will stop.
644                introspection_config
645                    .introspection_tokens
646                    .lock()
647                    .expect("poisoned")
648                    .insert(self.id, Box::new((scraper_token, web_token)));
649            }
650            IntrospectionType::StorageSinkStatistics => {
651                let prev = snapshot_statistics(
652                    self.id,
653                    introspection_config.recent_upper,
654                    &introspection_config.storage_collections,
655                )
656                .await;
657
658                let scraper_token =
659                    statistics::spawn_statistics_scraper::<_, SinkStatisticsUpdate, _>(
660                        self.id.clone(),
661                        introspection_config.collection_manager,
662                        Arc::clone(&introspection_config.sink_statistics),
663                        prev,
664                        introspection_config.statistics_interval,
665                        introspection_config.statistics_interval_receiver,
666                        introspection_config.metrics,
667                    );
668
669                // Make sure this is dropped when the controller is
670                // dropped, so that the internal task will stop.
671                introspection_config
672                    .introspection_tokens
673                    .lock()
674                    .expect("poisoned")
675                    .insert(self.id, scraper_token);
676            }
677
678            IntrospectionType::ComputeDependencies
679            | IntrospectionType::ComputeOperatorHydrationStatus
680            | IntrospectionType::ComputeMaterializedViewRefreshes
681            | IntrospectionType::ComputeErrorCounts
682            | IntrospectionType::ComputeHydrationTimes => {
683                // Differential collections start with an empty
684                // desired state. No need to manually reset.
685            }
686
687            introspection_type @ IntrospectionType::ReplicaMetricsHistory
688            | introspection_type @ IntrospectionType::WallclockLagHistory
689            | introspection_type @ IntrospectionType::WallclockLagHistogram
690            | introspection_type @ IntrospectionType::PreparedStatementHistory
691            | introspection_type @ IntrospectionType::StatementExecutionHistory
692            | introspection_type @ IntrospectionType::SessionHistory
693            | introspection_type @ IntrospectionType::StatementLifecycleHistory
694            | introspection_type @ IntrospectionType::SqlText
695            | introspection_type @ IntrospectionType::SourceStatusHistory
696            | introspection_type @ IntrospectionType::SinkStatusHistory
697            | introspection_type @ IntrospectionType::PrivatelinkConnectionStatusHistory
698            | introspection_type @ IntrospectionType::ReplicaStatusHistory => {
699                unreachable!("not differential collection: {introspection_type:?}")
700            }
701        }
702    }
703
704    async fn run(mut self) -> ControlFlow<String> {
705        const BATCH_SIZE: usize = 4096;
706        let mut updates = Vec::with_capacity(BATCH_SIZE);
707        loop {
708            tokio::select! {
709                // Prefer sending actual updates over just bumping the upper,
710                // because sending updates also bump the upper.
711                biased;
712
713                // Listen for a shutdown signal so we can gracefully cleanup.
714                _ = &mut self.shutdown_rx => {
715                    self.handle_shutdown();
716
717                    return ControlFlow::Break("graceful shutdown".to_string());
718                }
719
720                // Pull a chunk of queued updates off the channel.
721                count = self.cmd_rx.recv_many(&mut updates, BATCH_SIZE) => {
722                    if count > 0 {
723                        let _ = self.handle_updates(&mut updates).await?;
724                    } else {
725                        // Sender has been dropped, which means the collection
726                        // should have been unregistered, break out of the run
727                        // loop if we weren't already aborted.
728                        return ControlFlow::Break("sender has been dropped".to_string());
729                    }
730                }
731
732                // If we haven't received any updates, then we'll move the upper forward.
733                _ = self.upper_tick_interval.tick() => {
734                    if self.read_only {
735                        // Not bumping uppers while in read-only mode.
736                        continue;
737                    }
738                    let _ = self.tick_upper().await?;
739                },
740            }
741        }
742    }
743
744    async fn tick_upper(&mut self) -> ControlFlow<String> {
745        let now = T::from((self.now)());
746
747        if now <= self.current_upper {
748            // Upper is already further along than current wall-clock time, no
749            // need to bump it.
750            return ControlFlow::Continue(());
751        }
752
753        assert!(!self.read_only);
754        let res = self
755            .write_handle
756            .compare_and_append_batch(
757                &mut [],
758                Antichain::from_elem(self.current_upper.clone()),
759                Antichain::from_elem(now.clone()),
760            )
761            .await
762            .expect("valid usage");
763        match res {
764            // All good!
765            Ok(()) => {
766                tracing::debug!(%self.id, "bumped upper of differential collection");
767                self.current_upper = now;
768            }
769            Err(err) => {
770                // Someone else wrote to the collection or bumped the upper. We
771                // need to sync to latest persist state and potentially patch up
772                // our `to_write`, based on what we learn and `desired`.
773
774                let actual_upper = if let Some(ts) = err.current.as_option() {
775                    ts.clone()
776                } else {
777                    return ControlFlow::Break("upper is the empty antichain".to_string());
778                };
779
780                tracing::info!(%self.id, ?actual_upper, expected_upper = ?self.current_upper, "upper mismatch while bumping upper, syncing to persist state");
781
782                self.current_upper = actual_upper;
783
784                self.sync_to_persist().await;
785            }
786        }
787
788        ControlFlow::Continue(())
789    }
790
791    fn handle_shutdown(&mut self) {
792        let mut senders = Vec::new();
793
794        // Prevent new messages from being sent.
795        self.cmd_rx.close();
796
797        // Get as many waiting senders as possible.
798        while let Ok((_batch, sender)) = self.cmd_rx.try_recv() {
799            senders.push(sender);
800        }
801
802        // Notify them that this collection is closed.
803        //
804        // Note: if a task is shutting down, that indicates the source has been
805        // dropped, at which point the identifier is invalid. Returning this
806        // error provides a better user experience.
807        notify_listeners(senders, || Err(StorageError::IdentifierInvalid(self.id)));
808    }
809
810    async fn handle_updates(
811        &mut self,
812        batch: &mut Vec<(StorageWriteOp, oneshot::Sender<Result<(), StorageError<T>>>)>,
813    ) -> ControlFlow<String> {
814        // Put in place _some_ rate limiting.
815        let batch_duration_ms = STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT;
816
817        let use_batch_now = Instant::now();
818        let min_time_to_complete = use_batch_now + batch_duration_ms;
819
820        tracing::debug!(
821            ?use_batch_now,
822            ?batch_duration_ms,
823            ?min_time_to_complete,
824            "batch duration",
825        );
826
827        let mut responders = Vec::with_capacity(batch.len());
828        for (op, tx) in batch.drain(..) {
829            self.apply_write_op(op);
830            responders.push(tx);
831        }
832
833        // TODO: Maybe don't do it every time?
834        consolidation::consolidate(&mut self.desired);
835        consolidation::consolidate(&mut self.to_write);
836
837        // Reset the interval which is used to periodically bump the uppers
838        // because the uppers will get bumped with the following update.
839        // This makes it such that we will write at most once every
840        // `interval`.
841        //
842        // For example, let's say our `DEFAULT_TICK` interval is 10, so at
843        // `t + 10`, `t + 20`, ... we'll bump the uppers. If we receive an
844        // update at `t + 3` we want to shift this window so we bump the
845        // uppers at `t + 13`, `t + 23`, ... which resetting the interval
846        // accomplishes.
847        self.upper_tick_interval.reset();
848
849        self.write_to_persist(responders).await?;
850
851        // Wait until our artificial latency has completed.
852        //
853        // Note: if writing to persist took longer than `DEFAULT_TICK` this
854        // await will resolve immediately.
855        tokio::time::sleep_until(min_time_to_complete).await;
856
857        ControlFlow::Continue(())
858    }
859
860    /// Apply the given write operation to the `desired`/`to_write` state.
861    fn apply_write_op(&mut self, op: StorageWriteOp) {
862        match op {
863            StorageWriteOp::Append { updates } => {
864                self.desired.extend_from_slice(&updates);
865                self.to_write.extend(updates);
866            }
867            StorageWriteOp::Delete { filter } => {
868                let to_delete = self.desired.drain_filter_swapping(|(row, _)| filter(row));
869                let retractions = to_delete.map(|(row, diff)| (row, -diff));
870                self.to_write.extend(retractions);
871            }
872        }
873    }
874
875    /// Attempt to write what is currently in [Self::to_write] to persist,
876    /// retrying and re-syncing to persist when necessary, that is when the
877    /// upper was not what we expected.
878    async fn write_to_persist(
879        &mut self,
880        responders: Vec<oneshot::Sender<Result<(), StorageError<T>>>>,
881    ) -> ControlFlow<String> {
882        if self.read_only {
883            tracing::debug!(%self.id, "not writing to differential collection: read-only");
884            // Not attempting to write while in read-only mode.
885            return ControlFlow::Continue(());
886        }
887
888        // We'll try really hard to succeed, but eventually stop.
889        //
890        // Note: it's very rare we should ever need to retry, and if we need to
891        // retry it should only take 1 or 2 attempts. We set `max_tries` to be
892        // high though because if we hit some edge case we want to try hard to
893        // commit the data.
894        let retries = Retry::default()
895            .initial_backoff(Duration::from_secs(1))
896            .clamp_backoff(Duration::from_secs(3))
897            .factor(1.25)
898            .max_tries(20)
899            .into_retry_stream();
900        let mut retries = Box::pin(retries);
901
902        loop {
903            // Append updates to persist!
904            let now = T::from((self.now)());
905            let new_upper = std::cmp::max(
906                now,
907                TimestampManipulation::step_forward(&self.current_upper),
908            );
909
910            let updates_to_write = self
911                .to_write
912                .iter()
913                .map(|(row, diff)| {
914                    (
915                        (SourceData(Ok(row.clone())), ()),
916                        self.current_upper.clone(),
917                        diff.into_inner(),
918                    )
919                })
920                .collect::<Vec<_>>();
921
922            assert!(!self.read_only);
923            let res = self
924                .write_handle
925                .compare_and_append(
926                    updates_to_write,
927                    Antichain::from_elem(self.current_upper.clone()),
928                    Antichain::from_elem(new_upper.clone()),
929                )
930                .await
931                .expect("valid usage");
932            match res {
933                // Everything was successful!
934                Ok(()) => {
935                    // Notify all of our listeners.
936                    notify_listeners(responders, || Ok(()));
937
938                    self.current_upper = new_upper;
939
940                    // Very important! This is empty at steady state, while
941                    // desired keeps an in-memory copy of desired state.
942                    self.to_write.clear();
943
944                    tracing::debug!(%self.id, "appended to differential collection");
945
946                    // Break out of the retry loop so we can wait for more data.
947                    break;
948                }
949                // Failed to write to some collections,
950                Err(err) => {
951                    // Someone else wrote to the collection. We need to read
952                    // from persist and update to_write based on that and the
953                    // desired state.
954                    let actual_upper = if let Some(ts) = err.current.as_option() {
955                        ts.clone()
956                    } else {
957                        return ControlFlow::Break("upper is the empty antichain".to_string());
958                    };
959
960                    tracing::info!(%self.id, ?actual_upper, expected_upper = ?self.current_upper, "retrying append for differential collection");
961
962                    // We've exhausted all of our retries, notify listeners and
963                    // break out of the retry loop so we can wait for more data.
964                    if retries.next().await.is_none() {
965                        let invalid_upper = InvalidUpper {
966                            id: self.id,
967                            current_upper: err.current,
968                        };
969                        notify_listeners(responders, || {
970                            Err(StorageError::InvalidUppers(vec![invalid_upper.clone()]))
971                        });
972                        error!(
973                            "exhausted retries when appending to managed collection {}",
974                            self.id
975                        );
976                        break;
977                    }
978
979                    self.current_upper = actual_upper;
980
981                    self.sync_to_persist().await;
982
983                    debug!(
984                        "Retrying invalid-uppers error while appending to differential collection {}",
985                        self.id
986                    );
987                }
988            }
989        }
990
991        ControlFlow::Continue(())
992    }
993
994    /// Re-derives [Self::to_write] by looking at [Self::desired] and the
995    /// current state in persist. We want to insert everything in desired and
996    /// retract everything in persist. But ideally most of that cancels out in
997    /// consolidation.
998    ///
999    /// To be called when a `compare_and_append` failed because the upper didn't
1000    /// match what we expected.
1001    async fn sync_to_persist(&mut self) {
1002        let mut read_handle = (self.read_handle_fn)().await;
1003        let as_of = self
1004            .current_upper
1005            .step_back()
1006            .unwrap_or_else(|| T::minimum());
1007        let as_of = Antichain::from_elem(as_of);
1008        let snapshot = read_handle.snapshot_and_fetch(as_of).await;
1009
1010        let mut negated_oks = match snapshot {
1011            Ok(contents) => {
1012                let mut snapshot = Vec::with_capacity(contents.len());
1013                for ((data, _), _, diff) in contents {
1014                    let row = data.expect("invalid protobuf data").0.unwrap();
1015                    snapshot.push((row, -Diff::from(diff)));
1016                }
1017                snapshot
1018            }
1019            Err(_) => panic!("read before since"),
1020        };
1021
1022        self.to_write.clear();
1023        self.to_write.extend(self.desired.iter().cloned());
1024        self.to_write.append(&mut negated_oks);
1025        consolidation::consolidate(&mut self.to_write);
1026    }
1027}
1028
1029pub(crate) struct AppendOnlyIntrospectionConfig<T>
1030where
1031    T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
1032{
1033    pub(crate) introspection_type: IntrospectionType,
1034    pub(crate) config_set: Arc<ConfigSet>,
1035    pub(crate) parameters: StorageParameters,
1036    pub(crate) storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
1037}
1038
1039/// A task that writes to an append only collection and continuously bumps the upper for the specified
1040/// collection.
1041///
1042/// For status history collections, this task can deduplicate redundant [`Statuses`](Status).
1043struct AppendOnlyWriteTask<T>
1044where
1045    T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
1046{
1047    /// The collection that we are writing to.
1048    id: GlobalId,
1049    write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
1050    read_only: bool,
1051    now: NowFn,
1052    user_batch_duration_ms: Arc<AtomicU64>,
1053    /// Receiver for write commands.
1054    rx: mpsc::UnboundedReceiver<(
1055        Vec<AppendOnlyUpdate>,
1056        oneshot::Sender<Result<(), StorageError<T>>>,
1057    )>,
1058
1059    /// We have to shut down when receiving from this.
1060    shutdown_rx: oneshot::Receiver<()>,
1061    /// If this collection deduplicates statuses, this map is used to track the previous status.
1062    previous_statuses: Option<BTreeMap<(GlobalId, Option<ReplicaId>), Status>>,
1063}
1064
1065impl<T> AppendOnlyWriteTask<T>
1066where
1067    T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
1068{
1069    /// Spawns an [`AppendOnlyWriteTask`] in an [`mz_ore::task`] that will continuously bump the
1070    /// upper for the specified collection,
1071    /// and append data that is sent via the provided [`mpsc::UnboundedSender`].
1072    ///
1073    /// TODO(parkmycar): One day if we want to customize the tick interval for each collection, that
1074    /// should be done here.
1075    /// TODO(parkmycar): Maybe add prometheus metrics for each collection?
1076    fn spawn(
1077        id: GlobalId,
1078        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
1079        read_only: bool,
1080        now: NowFn,
1081        user_batch_duration_ms: Arc<AtomicU64>,
1082        introspection_config: Option<AppendOnlyIntrospectionConfig<T>>,
1083    ) -> (AppendOnlyWriteChannel<T>, WriteTask, ShutdownSender) {
1084        let (tx, rx) = mpsc::unbounded_channel();
1085        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1086
1087        let previous_statuses: Option<BTreeMap<(GlobalId, Option<ReplicaId>), Status>> =
1088            match introspection_config
1089                .as_ref()
1090                .map(|config| config.introspection_type)
1091            {
1092                Some(IntrospectionType::SourceStatusHistory)
1093                | Some(IntrospectionType::SinkStatusHistory) => Some(BTreeMap::new()),
1094
1095                Some(IntrospectionType::ReplicaMetricsHistory)
1096                | Some(IntrospectionType::WallclockLagHistory)
1097                | Some(IntrospectionType::WallclockLagHistogram)
1098                | Some(IntrospectionType::PrivatelinkConnectionStatusHistory)
1099                | Some(IntrospectionType::ReplicaStatusHistory)
1100                | Some(IntrospectionType::PreparedStatementHistory)
1101                | Some(IntrospectionType::StatementExecutionHistory)
1102                | Some(IntrospectionType::SessionHistory)
1103                | Some(IntrospectionType::StatementLifecycleHistory)
1104                | Some(IntrospectionType::SqlText)
1105                | None => None,
1106
1107                Some(introspection_type @ IntrospectionType::ShardMapping)
1108                | Some(introspection_type @ IntrospectionType::Frontiers)
1109                | Some(introspection_type @ IntrospectionType::ReplicaFrontiers)
1110                | Some(introspection_type @ IntrospectionType::StorageSourceStatistics)
1111                | Some(introspection_type @ IntrospectionType::StorageSinkStatistics)
1112                | Some(introspection_type @ IntrospectionType::ComputeDependencies)
1113                | Some(introspection_type @ IntrospectionType::ComputeOperatorHydrationStatus)
1114                | Some(introspection_type @ IntrospectionType::ComputeMaterializedViewRefreshes)
1115                | Some(introspection_type @ IntrospectionType::ComputeErrorCounts)
1116                | Some(introspection_type @ IntrospectionType::ComputeHydrationTimes) => {
1117                    unreachable!("not append-only collection: {introspection_type:?}")
1118                }
1119            };
1120
1121        let mut task = Self {
1122            id,
1123            write_handle,
1124            rx,
1125            shutdown_rx,
1126            read_only,
1127            now,
1128            user_batch_duration_ms,
1129            previous_statuses,
1130        };
1131
1132        let handle = mz_ore::task::spawn(
1133            || format!("CollectionManager-append_only_write_task-{id}"),
1134            async move {
1135                if !task.read_only {
1136                    task.prepare(introspection_config).await;
1137                }
1138                task.run().await;
1139            },
1140        );
1141
1142        (tx, handle.abort_on_drop(), shutdown_tx)
1143    }
1144
1145    /// Does any work that is required before the background task starts
1146    /// writing to the given append only introspection collection.
1147    ///
1148    /// This might include consolidation or deleting older entries.
1149    async fn prepare(&mut self, introspection_config: Option<AppendOnlyIntrospectionConfig<T>>) {
1150        let Some(AppendOnlyIntrospectionConfig {
1151            introspection_type,
1152            config_set,
1153            parameters,
1154            storage_collections,
1155        }) = introspection_config
1156        else {
1157            return;
1158        };
1159        let initial_statuses = match introspection_type {
1160            IntrospectionType::ReplicaMetricsHistory
1161            | IntrospectionType::WallclockLagHistory
1162            | IntrospectionType::WallclockLagHistogram => {
1163                let result = partially_truncate_metrics_history(
1164                    self.id,
1165                    introspection_type,
1166                    &mut self.write_handle,
1167                    config_set,
1168                    self.now.clone(),
1169                    storage_collections,
1170                )
1171                .await;
1172                if let Err(error) = result {
1173                    soft_panic_or_log!(
1174                        "error truncating metrics history: {error} (type={introspection_type:?})"
1175                    );
1176                }
1177                Vec::new()
1178            }
1179
1180            IntrospectionType::PrivatelinkConnectionStatusHistory => {
1181                partially_truncate_status_history(
1182                    self.id,
1183                    IntrospectionType::PrivatelinkConnectionStatusHistory,
1184                    &mut self.write_handle,
1185                    privatelink_status_history_desc(&parameters),
1186                    self.now.clone(),
1187                    &storage_collections,
1188                )
1189                .await;
1190                Vec::new()
1191            }
1192            IntrospectionType::ReplicaStatusHistory => {
1193                partially_truncate_status_history(
1194                    self.id,
1195                    IntrospectionType::ReplicaStatusHistory,
1196                    &mut self.write_handle,
1197                    replica_status_history_desc(&parameters),
1198                    self.now.clone(),
1199                    &storage_collections,
1200                )
1201                .await;
1202                Vec::new()
1203            }
1204
1205            // Note [btv] - we don't truncate these, because that uses
1206            // a huge amount of memory on environmentd startup.
1207            IntrospectionType::PreparedStatementHistory
1208            | IntrospectionType::StatementExecutionHistory
1209            | IntrospectionType::SessionHistory
1210            | IntrospectionType::StatementLifecycleHistory
1211            | IntrospectionType::SqlText => {
1212                // NOTE(aljoscha): We never remove from these
1213                // collections. Someone, at some point needs to
1214                // think about that! Issue:
1215                // https://github.com/MaterializeInc/database-issues/issues/7666
1216                Vec::new()
1217            }
1218
1219            IntrospectionType::SourceStatusHistory => {
1220                let last_status_per_id = partially_truncate_status_history(
1221                    self.id,
1222                    IntrospectionType::SourceStatusHistory,
1223                    &mut self.write_handle,
1224                    source_status_history_desc(&parameters),
1225                    self.now.clone(),
1226                    &storage_collections,
1227                )
1228                .await;
1229
1230                let status_col = MZ_SOURCE_STATUS_HISTORY_DESC
1231                    .get_by_name(&ColumnName::from("status"))
1232                    .expect("schema has not changed")
1233                    .0;
1234
1235                last_status_per_id
1236                    .into_iter()
1237                    .map(|(id, row)| {
1238                        (
1239                            id,
1240                            Status::from_str(
1241                                row.iter()
1242                                    .nth(status_col)
1243                                    .expect("schema has not changed")
1244                                    .unwrap_str(),
1245                            )
1246                            .expect("statuses must be uncorrupted"),
1247                        )
1248                    })
1249                    .collect()
1250            }
1251            IntrospectionType::SinkStatusHistory => {
1252                let last_status_per_id = partially_truncate_status_history(
1253                    self.id,
1254                    IntrospectionType::SinkStatusHistory,
1255                    &mut self.write_handle,
1256                    sink_status_history_desc(&parameters),
1257                    self.now.clone(),
1258                    &storage_collections,
1259                )
1260                .await;
1261
1262                let status_col = MZ_SINK_STATUS_HISTORY_DESC
1263                    .get_by_name(&ColumnName::from("status"))
1264                    .expect("schema has not changed")
1265                    .0;
1266
1267                last_status_per_id
1268                    .into_iter()
1269                    .map(|(id, row)| {
1270                        (
1271                            id,
1272                            Status::from_str(
1273                                row.iter()
1274                                    .nth(status_col)
1275                                    .expect("schema has not changed")
1276                                    .unwrap_str(),
1277                            )
1278                            .expect("statuses must be uncorrupted"),
1279                        )
1280                    })
1281                    .collect()
1282            }
1283
1284            introspection_type @ IntrospectionType::ShardMapping
1285            | introspection_type @ IntrospectionType::Frontiers
1286            | introspection_type @ IntrospectionType::ReplicaFrontiers
1287            | introspection_type @ IntrospectionType::StorageSourceStatistics
1288            | introspection_type @ IntrospectionType::StorageSinkStatistics
1289            | introspection_type @ IntrospectionType::ComputeDependencies
1290            | introspection_type @ IntrospectionType::ComputeOperatorHydrationStatus
1291            | introspection_type @ IntrospectionType::ComputeMaterializedViewRefreshes
1292            | introspection_type @ IntrospectionType::ComputeErrorCounts
1293            | introspection_type @ IntrospectionType::ComputeHydrationTimes => {
1294                unreachable!("not append-only collection: {introspection_type:?}")
1295            }
1296        };
1297        if let Some(previous_statuses) = &mut self.previous_statuses {
1298            previous_statuses.extend(initial_statuses);
1299        }
1300    }
1301
1302    async fn run(mut self) {
1303        let mut interval = tokio::time::interval(Duration::from_millis(DEFAULT_TICK_MS));
1304
1305        const BATCH_SIZE: usize = 4096;
1306        let mut batch: Vec<(Vec<_>, _)> = Vec::with_capacity(BATCH_SIZE);
1307
1308        'run: loop {
1309            tokio::select! {
1310                // Prefer sending actual updates over just bumping the upper, because sending
1311                // updates also bump the upper.
1312                biased;
1313
1314                // Listen for a shutdown signal so we can gracefully cleanup.
1315                _ = &mut self.shutdown_rx => {
1316                    let mut senders = Vec::new();
1317
1318                    // Prevent new messages from being sent.
1319                    self.rx.close();
1320
1321                    // Get as many waiting senders as possible.
1322                    while let Ok((_batch, sender)) = self.rx.try_recv() {
1323                        senders.push(sender);
1324                    }
1325
1326                    // Notify them that this collection is closed.
1327                    //
1328                    // Note: if a task is shutting down, that indicates the source has been
1329                    // dropped, at which point the identifier is invalid. Returning this
1330                    // error provides a better user experience.
1331                    notify_listeners(senders, || Err(StorageError::IdentifierInvalid(self.id)));
1332
1333                    break 'run;
1334                }
1335
1336                // Pull a chunk of queued updates off the channel.
1337                count = self.rx.recv_many(&mut batch, BATCH_SIZE) => {
1338                    if count > 0 {
1339                        // To rate limit appends to persist we add artificial latency, and will
1340                        // finish no sooner than this instant.
1341                        let batch_duration_ms = match self.id {
1342                            GlobalId::User(_) => Duration::from_millis(self.user_batch_duration_ms.load(Ordering::Relaxed)),
1343                            // For non-user collections, always just use the default.
1344                            _ => STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT,
1345                        };
1346                        let use_batch_now = Instant::now();
1347                        let min_time_to_complete = use_batch_now + batch_duration_ms;
1348
1349                        tracing::debug!(
1350                            ?use_batch_now,
1351                            ?batch_duration_ms,
1352                            ?min_time_to_complete,
1353                            "batch duration",
1354                        );
1355
1356                        // Reset the interval which is used to periodically bump the uppers
1357                        // because the uppers will get bumped with the following update. This
1358                        // makes it such that we will write at most once every `interval`.
1359                        //
1360                        // For example, let's say our `DEFAULT_TICK` interval is 10, so at
1361                        // `t + 10`, `t + 20`, ... we'll bump the uppers. If we receive an
1362                        // update at `t + 3` we want to shift this window so we bump the uppers
1363                        // at `t + 13`, `t + 23`, ... which resetting the interval accomplishes.
1364                        interval.reset();
1365
1366
1367                        let mut all_rows = Vec::with_capacity(batch.iter().map(|(rows, _)| rows.len()).sum());
1368                        let mut responders = Vec::with_capacity(batch.len());
1369
1370                        for (updates, responder) in batch.drain(..) {
1371                            let rows = self.process_updates(updates);
1372
1373                            all_rows.extend(rows.map(|(row, diff)| TimestamplessUpdate { row, diff}));
1374                            responders.push(responder);
1375                        }
1376
1377                        if self.read_only {
1378                            tracing::warn!(%self.id, ?all_rows, "append while in read-only mode");
1379                            notify_listeners(responders, || Err(StorageError::ReadOnly));
1380                            continue;
1381                        }
1382
1383                        // Append updates to persist!
1384                        let at_least = T::from((self.now)());
1385
1386                        if !all_rows.is_empty() {
1387                            monotonic_append(&mut self.write_handle, all_rows, at_least).await;
1388                        }
1389                        // Notify all of our listeners.
1390                        notify_listeners(responders, || Ok(()));
1391
1392                        // Wait until our artificial latency has completed.
1393                        //
1394                        // Note: if writing to persist took longer than `DEFAULT_TICK` this
1395                        // await will resolve immediately.
1396                        tokio::time::sleep_until(min_time_to_complete).await;
1397                    } else {
1398                        // Sender has been dropped, which means the collection should have been
1399                        // unregistered, break out of the run loop if we weren't already
1400                        // aborted.
1401                        break 'run;
1402                    }
1403                }
1404
1405                // If we haven't received any updates, then we'll move the upper forward.
1406                _ = interval.tick() => {
1407                    if self.read_only {
1408                        // Not bumping uppers while in read-only mode.
1409                        continue;
1410                    }
1411
1412                    // Update our collection.
1413                    let now = T::from((self.now)());
1414                    let updates = vec![];
1415                    let at_least = now.clone();
1416
1417                    // Failures don't matter when advancing collections' uppers. This might
1418                    // fail when a clusterd happens to be writing to this concurrently.
1419                    // Advancing uppers here is best-effort and only needs to succeed if no
1420                    // one else is advancing it; contention proves otherwise.
1421                    monotonic_append(&mut self.write_handle, updates, at_least).await;
1422                },
1423            }
1424        }
1425
1426        info!("write_task-{} ending", self.id);
1427    }
1428
1429    /// Deduplicate any [`mz_storage_client::client::StatusUpdate`] within `updates` and converts
1430    /// `updates` to rows and diffs.
1431    fn process_updates(
1432        &mut self,
1433        updates: Vec<AppendOnlyUpdate>,
1434    ) -> impl Iterator<Item = (Row, Diff)> {
1435        let updates = if let Some(previous_statuses) = &mut self.previous_statuses {
1436            let new: Vec<_> = updates
1437                .into_iter()
1438                .filter(|r| match r {
1439                    AppendOnlyUpdate::Row(_) => true,
1440                    AppendOnlyUpdate::Status(update) => {
1441                        match (
1442                            previous_statuses
1443                                .get(&(update.id, update.replica_id))
1444                                .as_deref(),
1445                            &update.status,
1446                        ) {
1447                            (None, _) => true,
1448                            (Some(old), new) => old.superseded_by(*new),
1449                        }
1450                    }
1451                })
1452                .collect();
1453            previous_statuses.extend(new.iter().filter_map(|update| match update {
1454                AppendOnlyUpdate::Row(_) => None,
1455                AppendOnlyUpdate::Status(update) => {
1456                    Some(((update.id, update.replica_id), update.status))
1457                }
1458            }));
1459            new
1460        } else {
1461            updates
1462        };
1463
1464        updates.into_iter().map(AppendOnlyUpdate::into_row)
1465    }
1466}
1467
1468/// Truncates the given metrics history by removing all entries older than that history's
1469/// configured retention interval.
1470///
1471/// # Panics
1472///
1473/// Panics if `collection` is not a metrics history.
1474async fn partially_truncate_metrics_history<T>(
1475    id: GlobalId,
1476    introspection_type: IntrospectionType,
1477    write_handle: &mut WriteHandle<SourceData, (), T, StorageDiff>,
1478    config_set: Arc<ConfigSet>,
1479    now: NowFn,
1480    storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
1481) -> Result<(), anyhow::Error>
1482where
1483    T: Codec64 + From<EpochMillis> + TimestampManipulation,
1484{
1485    let (keep_duration, occurred_at_col) = match introspection_type {
1486        IntrospectionType::ReplicaMetricsHistory => (
1487            REPLICA_METRICS_HISTORY_RETENTION_INTERVAL.get(&config_set),
1488            REPLICA_METRICS_HISTORY_DESC
1489                .get_by_name(&ColumnName::from("occurred_at"))
1490                .expect("schema has not changed")
1491                .0,
1492        ),
1493        IntrospectionType::WallclockLagHistory => (
1494            WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL.get(&config_set),
1495            WALLCLOCK_LAG_HISTORY_DESC
1496                .get_by_name(&ColumnName::from("occurred_at"))
1497                .expect("schema has not changed")
1498                .0,
1499        ),
1500        IntrospectionType::WallclockLagHistogram => (
1501            WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL.get(&config_set),
1502            WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC
1503                .get_by_name(&ColumnName::from("period_start"))
1504                .expect("schema has not changed")
1505                .0,
1506        ),
1507        _ => panic!("not a metrics history: {introspection_type:?}"),
1508    };
1509
1510    let upper = write_handle.fetch_recent_upper().await;
1511    let Some(upper_ts) = upper.as_option() else {
1512        bail!("collection is sealed");
1513    };
1514    let Some(as_of_ts) = upper_ts.step_back() else {
1515        return Ok(()); // nothing to truncate
1516    };
1517
1518    let mut rows = storage_collections
1519        .snapshot(id, as_of_ts)
1520        .await
1521        .map_err(|e| anyhow!("reading snapshot: {e:?}"))?;
1522
1523    let now = mz_ore::now::to_datetime(now());
1524    let keep_since = now - keep_duration;
1525
1526    // Produce retractions by inverting diffs of rows we want to delete and setting the diffs
1527    // of all other rows to 0.
1528    for (row, diff) in &mut rows {
1529        let datums = row.unpack();
1530        let occurred_at = datums[occurred_at_col].unwrap_timestamptz();
1531        *diff = if *occurred_at < keep_since { -*diff } else { 0 };
1532    }
1533
1534    // Consolidate to avoid superfluous writes.
1535    consolidation::consolidate(&mut rows);
1536
1537    if rows.is_empty() {
1538        return Ok(());
1539    }
1540
1541    // It is very important that we append our retractions at the timestamp
1542    // right after the timestamp at which we got our snapshot. Otherwise,
1543    // it's possible for someone else to sneak in retractions or other
1544    // unexpected changes.
1545    let old_upper_ts = upper_ts.clone();
1546    let write_ts = old_upper_ts.clone();
1547    let new_upper_ts = TimestampManipulation::step_forward(&old_upper_ts);
1548
1549    let updates = rows
1550        .into_iter()
1551        .map(|(row, diff)| ((SourceData(Ok(row)), ()), write_ts.clone(), diff));
1552
1553    write_handle
1554        .compare_and_append(
1555            updates,
1556            Antichain::from_elem(old_upper_ts),
1557            Antichain::from_elem(new_upper_ts),
1558        )
1559        .await
1560        .expect("valid usage")
1561        .map_err(|e| anyhow!("appending retractions: {e:?}"))
1562}
1563
1564/// Effectively truncates the status history shard based on its retention policy.
1565///
1566/// NOTE: The history collections are really append-only collections, but
1567/// every-now-and-then we want to retract old updates so that the collection
1568/// does not grow unboundedly. Crucially, these are _not_ incremental
1569/// collections, they are not derived from a state at some time `t` and we
1570/// cannot maintain a desired state for them.
1571///
1572/// Returns a map with latest unpacked row per key.
1573pub(crate) async fn partially_truncate_status_history<T, K>(
1574    id: GlobalId,
1575    introspection_type: IntrospectionType,
1576    write_handle: &mut WriteHandle<SourceData, (), T, StorageDiff>,
1577    status_history_desc: StatusHistoryDesc<K>,
1578    now: NowFn,
1579    storage_collections: &Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
1580) -> BTreeMap<K, Row>
1581where
1582    T: Codec64 + From<EpochMillis> + TimestampManipulation,
1583    K: Clone + Debug + Ord + Send + Sync,
1584{
1585    let upper = write_handle.fetch_recent_upper().await.clone();
1586
1587    let mut rows = match upper.as_option() {
1588        Some(f) if f > &T::minimum() => {
1589            let as_of = f.step_back().unwrap();
1590
1591            storage_collections
1592                .snapshot(id, as_of)
1593                .await
1594                .expect("snapshot succeeds")
1595        }
1596        // If collection is closed or the frontier is the minimum, we cannot
1597        // or don't need to truncate (respectively).
1598        _ => return BTreeMap::new(),
1599    };
1600
1601    // BTreeMap to keep track of the row with the latest timestamp for each key.
1602    let mut latest_row_per_key: BTreeMap<K, (CheckedTimestamp<DateTime<Utc>>, Row)> =
1603        BTreeMap::new();
1604
1605    // Consolidate the snapshot, so we can process it correctly below.
1606    differential_dataflow::consolidation::consolidate(&mut rows);
1607
1608    let mut deletions = vec![];
1609
1610    let mut handle_row = {
1611        let latest_row_per_key = &mut latest_row_per_key;
1612        move |row: &Row, diff| {
1613            let datums = row.unpack();
1614            let key = (status_history_desc.extract_key)(&datums);
1615            let timestamp = (status_history_desc.extract_time)(&datums);
1616
1617            assert!(
1618                diff > 0,
1619                "only know how to operate over consolidated data with diffs > 0, \
1620                    found diff {diff} for object {key:?} in {introspection_type:?}",
1621            );
1622
1623            // Keep track of the timestamp of the latest row per key.
1624            match latest_row_per_key.get(&key) {
1625                Some(existing) if &existing.0 > &timestamp => {}
1626                _ => {
1627                    latest_row_per_key.insert(key.clone(), (timestamp, row.clone()));
1628                }
1629            };
1630            (key, timestamp)
1631        }
1632    };
1633
1634    match status_history_desc.retention_policy {
1635        StatusHistoryRetentionPolicy::LastN(n) => {
1636            // BTreeMap to track the earliest events for each key.
1637            let mut last_n_entries_per_key: BTreeMap<
1638                K,
1639                BinaryHeap<Reverse<(CheckedTimestamp<DateTime<Utc>>, Row)>>,
1640            > = BTreeMap::new();
1641
1642            for (row, diff) in rows {
1643                let (key, timestamp) = handle_row(&row, diff);
1644
1645                // Duplicate rows ARE possible if many status changes happen in VERY quick succession,
1646                // so we handle duplicated rows separately.
1647                let entries = last_n_entries_per_key.entry(key).or_default();
1648                for _ in 0..diff {
1649                    // We CAN have multiple statuses (most likely Starting and Running) at the exact same
1650                    // millisecond, depending on how the `health_operator` is scheduled.
1651                    //
1652                    // Note that these will be arbitrarily ordered, so a Starting event might
1653                    // survive and a Running one won't. The next restart will remove the other,
1654                    // so we don't bother being careful about it.
1655                    //
1656                    // TODO(guswynn): unpack these into health-status objects and use
1657                    // their `Ord` impl.
1658                    entries.push(Reverse((timestamp, row.clone())));
1659
1660                    // Retain some number of entries, using pop to mark the oldest entries for
1661                    // deletion.
1662                    while entries.len() > n {
1663                        if let Some(Reverse((_, r))) = entries.pop() {
1664                            deletions.push(r);
1665                        }
1666                    }
1667                }
1668            }
1669        }
1670        StatusHistoryRetentionPolicy::TimeWindow(time_window) => {
1671            // Get the lower bound of our retention window
1672            let now = mz_ore::now::to_datetime(now());
1673            let keep_since = now - time_window;
1674
1675            // Mark any row outside the retention window for deletion
1676            for (row, diff) in rows {
1677                let (_, timestamp) = handle_row(&row, diff);
1678
1679                if *timestamp < keep_since {
1680                    deletions.push(row);
1681                }
1682            }
1683        }
1684    }
1685
1686    // It is very important that we append our retractions at the timestamp
1687    // right after the timestamp at which we got our snapshot. Otherwise,
1688    // it's possible for someone else to sneak in retractions or other
1689    // unexpected changes.
1690    let expected_upper = upper.into_option().expect("checked above");
1691    let new_upper = TimestampManipulation::step_forward(&expected_upper);
1692
1693    // Updates are only deletes because everything else is already in the shard.
1694    let updates = deletions
1695        .into_iter()
1696        .map(|row| ((SourceData(Ok(row)), ()), expected_upper.clone(), -1))
1697        .collect::<Vec<_>>();
1698
1699    let res = write_handle
1700        .compare_and_append(
1701            updates,
1702            Antichain::from_elem(expected_upper.clone()),
1703            Antichain::from_elem(new_upper),
1704        )
1705        .await
1706        .expect("usage was valid");
1707
1708    match res {
1709        Ok(_) => {
1710            // All good, yay!
1711        }
1712        Err(err) => {
1713            // This is fine, it just means the upper moved because
1714            // of continual upper advancement or because someone
1715            // already appended some more retractions/updates.
1716            //
1717            // NOTE: We might want to attempt these partial
1718            // retractions on an interval, instead of only when
1719            // starting up!
1720            info!(
1721                %id, ?expected_upper, current_upper = ?err.current,
1722                "failed to append partial truncation",
1723            );
1724        }
1725    }
1726
1727    latest_row_per_key
1728        .into_iter()
1729        .map(|(key, (_, row))| (key, row))
1730        .collect()
1731}
1732
1733async fn monotonic_append<T: Timestamp + Lattice + Codec64 + TimestampManipulation>(
1734    write_handle: &mut WriteHandle<SourceData, (), T, StorageDiff>,
1735    updates: Vec<TimestamplessUpdate>,
1736    at_least: T,
1737) {
1738    let mut expected_upper = write_handle.shared_upper();
1739    loop {
1740        if updates.is_empty() && expected_upper.is_empty() {
1741            // Ignore timestamp advancement for
1742            // closed collections. TODO? Make this a
1743            // correctable error
1744            return;
1745        }
1746
1747        let upper = expected_upper
1748            .into_option()
1749            .expect("cannot append data to closed collection");
1750
1751        let lower = if upper.less_than(&at_least) {
1752            at_least.clone()
1753        } else {
1754            upper.clone()
1755        };
1756
1757        let new_upper = TimestampManipulation::step_forward(&lower);
1758        let updates = updates
1759            .iter()
1760            .map(|TimestamplessUpdate { row, diff }| {
1761                (
1762                    (SourceData(Ok(row.clone())), ()),
1763                    lower.clone(),
1764                    diff.into_inner(),
1765                )
1766            })
1767            .collect::<Vec<_>>();
1768        let res = write_handle
1769            .compare_and_append(
1770                updates,
1771                Antichain::from_elem(upper),
1772                Antichain::from_elem(new_upper),
1773            )
1774            .await
1775            .expect("valid usage");
1776        match res {
1777            Ok(()) => return,
1778            Err(err) => {
1779                expected_upper = err.current;
1780                continue;
1781            }
1782        }
1783    }
1784}
1785
1786// Helper method for notifying listeners.
1787fn notify_listeners<T>(
1788    responders: impl IntoIterator<Item = oneshot::Sender<T>>,
1789    result: impl Fn() -> T,
1790) {
1791    for r in responders {
1792        // We don't care if the listener disappeared.
1793        let _ = r.send(result());
1794    }
1795}
1796
1797#[cfg(test)]
1798mod tests {
1799    use std::collections::BTreeSet;
1800
1801    use super::*;
1802    use mz_repr::{Datum, Row};
1803    use mz_storage_client::client::StatusUpdate;
1804    use mz_storage_client::healthcheck::{
1805        MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC,
1806    };
1807
1808    #[mz_ore::test]
1809    fn test_row() {
1810        let error_message = "error message";
1811        let hint = "hint message";
1812        let id = GlobalId::User(1);
1813        let status = Status::Dropped;
1814        let row = Row::from(StatusUpdate {
1815            id,
1816            timestamp: chrono::offset::Utc::now(),
1817            status,
1818            error: Some(error_message.to_string()),
1819            hints: BTreeSet::from([hint.to_string()]),
1820            namespaced_errors: Default::default(),
1821            replica_id: None,
1822        });
1823
1824        for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1825            assert!(datum.is_instance_of(column_type));
1826        }
1827
1828        for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) {
1829            assert!(datum.is_instance_of(column_type));
1830        }
1831
1832        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1833        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1834        assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
1835
1836        let details = row
1837            .iter()
1838            .nth(4)
1839            .unwrap()
1840            .unwrap_map()
1841            .iter()
1842            .collect::<Vec<_>>();
1843
1844        assert_eq!(details.len(), 1);
1845        let hint_datum = &details[0];
1846
1847        assert_eq!(hint_datum.0, "hints");
1848        assert_eq!(
1849            hint_datum.1.unwrap_list().iter().next().unwrap(),
1850            Datum::String(hint)
1851        );
1852    }
1853
1854    #[mz_ore::test]
1855    fn test_row_without_hint() {
1856        let error_message = "error message";
1857        let id = GlobalId::User(1);
1858        let status = Status::Dropped;
1859        let row = Row::from(StatusUpdate {
1860            id,
1861            timestamp: chrono::offset::Utc::now(),
1862            status,
1863            error: Some(error_message.to_string()),
1864            hints: Default::default(),
1865            namespaced_errors: Default::default(),
1866            replica_id: None,
1867        });
1868
1869        for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1870            assert!(datum.is_instance_of(column_type));
1871        }
1872
1873        for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) {
1874            assert!(datum.is_instance_of(column_type));
1875        }
1876
1877        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1878        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1879        assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
1880        assert_eq!(row.iter().nth(4).unwrap(), Datum::Null);
1881    }
1882
1883    #[mz_ore::test]
1884    fn test_row_without_error() {
1885        let id = GlobalId::User(1);
1886        let status = Status::Dropped;
1887        let hint = "hint message";
1888        let row = Row::from(StatusUpdate {
1889            id,
1890            timestamp: chrono::offset::Utc::now(),
1891            status,
1892            error: None,
1893            hints: BTreeSet::from([hint.to_string()]),
1894            namespaced_errors: Default::default(),
1895            replica_id: None,
1896        });
1897
1898        for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1899            assert!(datum.is_instance_of(column_type));
1900        }
1901
1902        for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) {
1903            assert!(datum.is_instance_of(column_type));
1904        }
1905
1906        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1907        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1908        assert_eq!(row.iter().nth(3).unwrap(), Datum::Null);
1909
1910        let details = row
1911            .iter()
1912            .nth(4)
1913            .unwrap()
1914            .unwrap_map()
1915            .iter()
1916            .collect::<Vec<_>>();
1917
1918        assert_eq!(details.len(), 1);
1919        let hint_datum = &details[0];
1920
1921        assert_eq!(hint_datum.0, "hints");
1922        assert_eq!(
1923            hint_datum.1.unwrap_list().iter().next().unwrap(),
1924            Datum::String(hint)
1925        );
1926    }
1927
1928    #[mz_ore::test]
1929    fn test_row_with_namespaced() {
1930        let error_message = "error message";
1931        let id = GlobalId::User(1);
1932        let status = Status::Dropped;
1933        let row = Row::from(StatusUpdate {
1934            id,
1935            timestamp: chrono::offset::Utc::now(),
1936            status,
1937            error: Some(error_message.to_string()),
1938            hints: Default::default(),
1939            namespaced_errors: BTreeMap::from([("thing".to_string(), "error".to_string())]),
1940            replica_id: None,
1941        });
1942
1943        for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1944            assert!(datum.is_instance_of(column_type));
1945        }
1946
1947        for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) {
1948            assert!(datum.is_instance_of(column_type));
1949        }
1950
1951        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1952        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1953        assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
1954
1955        let details = row
1956            .iter()
1957            .nth(4)
1958            .unwrap()
1959            .unwrap_map()
1960            .iter()
1961            .collect::<Vec<_>>();
1962
1963        assert_eq!(details.len(), 1);
1964        let ns_datum = &details[0];
1965
1966        assert_eq!(ns_datum.0, "namespaced");
1967        assert_eq!(
1968            ns_datum.1.unwrap_map().iter().next().unwrap(),
1969            ("thing", Datum::String("error"))
1970        );
1971    }
1972
1973    #[mz_ore::test]
1974    fn test_row_with_everything() {
1975        let error_message = "error message";
1976        let hint = "hint message";
1977        let id = GlobalId::User(1);
1978        let status = Status::Dropped;
1979        let row = Row::from(StatusUpdate {
1980            id,
1981            timestamp: chrono::offset::Utc::now(),
1982            status,
1983            error: Some(error_message.to_string()),
1984            hints: BTreeSet::from([hint.to_string()]),
1985            namespaced_errors: BTreeMap::from([("thing".to_string(), "error".to_string())]),
1986            replica_id: None,
1987        });
1988
1989        for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1990            assert!(datum.is_instance_of(column_type));
1991        }
1992
1993        for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) {
1994            assert!(datum.is_instance_of(column_type));
1995        }
1996
1997        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1998        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1999        assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
2000
2001        let details = row
2002            .iter()
2003            .nth(4)
2004            .unwrap()
2005            .unwrap_map()
2006            .iter()
2007            .collect::<Vec<_>>();
2008
2009        assert_eq!(details.len(), 2);
2010        // These are always sorted
2011        let hint_datum = &details[0];
2012        let ns_datum = &details[1];
2013
2014        assert_eq!(hint_datum.0, "hints");
2015        assert_eq!(
2016            hint_datum.1.unwrap_list().iter().next().unwrap(),
2017            Datum::String(hint)
2018        );
2019
2020        assert_eq!(ns_datum.0, "namespaced");
2021        assert_eq!(
2022            ns_datum.1.unwrap_map().iter().next().unwrap(),
2023            ("thing", Datum::String("error"))
2024        );
2025    }
2026}