mz_storage_controller/
collection_mgmt.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Tokio tasks (and support machinery) for maintaining storage-managed
11//! collections.
12//!
13//! We differentiate between append-only collections and differential
14//! collections. The intent is that knowing the type allows being more
15//! intentional about what state we keep in memory and how we work when in
16//! read-only mode / during zero-downtime upgrades.
17//!
18//! ## Append-only collections
19//!
20//! Writers only append blind writes. Those writes never fail. It does not
21//! matter at what timestamp they happen (to a degree, but ...).
22//!
23//! While in read-only mode, the append-only write task can immediately write
24//! updates out as batches, but only append them when going out of read-only
25//! mode.
26//!
27//! ## Differential collections
28//!
29//! These are very similar to the self-correcting persist_sink. We have an
30//! in-memory desired state and continually make it so that persist matches
31//! desired. As described below (in the task implementation), we could do this
32//! in a memory efficient way by keeping open a persist read handle and
33//! continually updating/consolidating our desired collection. This way, we
34//! would be memory-efficient even in read-only mode.
35//!
36//! This is an evolution of the current design where, on startup, we bring the
37//! persist collection into a known state (mostly by retracting everything) and
38//! then assume that this `envd` is the only writer. We panic when that is ever
39//! not the case, which we notice when the upper of a collection changes
40//! unexpectedly. With this new design we can instead continually update our
41//! view of the persist shard and emit updates when needed, when desired
42//! changed.
43//!
44//! NOTE: As it is, we always keep all of desired in memory. Only when told to
45//! go out of read-only mode would we start attempting to write.
46//!
47//! ## Read-only mode
48//!
49//! When [`CollectionManager`] is in read-only mode it cannot write out to
50//! persist. It will, however, maintain the `desired` state of differential
51//! collections so that we can immediately start writing out updates when going
52//! out of read-only mode.
53//!
54//! For append-only collections we either panic, in the case of
55//! [`CollectionManager::blind_write`], or report back a
56//! [`StorageError::ReadOnly`] when trying to append through a
57//! [`MonotonicAppender`] returned from
58//! [`CollectionManager::monotonic_appender`].
59
60use std::any::Any;
61use std::cmp::Reverse;
62use std::collections::{BTreeMap, BinaryHeap};
63use std::fmt::Debug;
64use std::ops::ControlFlow;
65use std::pin::Pin;
66use std::str::FromStr;
67use std::sync::atomic::{AtomicU64, Ordering};
68use std::sync::{Arc, Mutex};
69
70use anyhow::{anyhow, bail};
71use chrono::{DateTime, Utc};
72use differential_dataflow::consolidation;
73use differential_dataflow::lattice::Lattice;
74use futures::future::BoxFuture;
75use futures::stream::StreamExt;
76use futures::{Future, FutureExt};
77use mz_cluster_client::ReplicaId;
78use mz_dyncfg::ConfigSet;
79use mz_ore::now::{EpochMillis, NowFn};
80use mz_ore::retry::Retry;
81use mz_ore::soft_panic_or_log;
82use mz_ore::task::AbortOnDropHandle;
83use mz_persist_client::batch::Added;
84use mz_persist_client::read::ReadHandle;
85use mz_persist_client::write::WriteHandle;
86use mz_persist_types::Codec64;
87use mz_repr::adt::timestamp::CheckedTimestamp;
88use mz_repr::{ColumnName, Diff, GlobalId, Row, TimestampManipulation};
89use mz_storage_client::client::{AppendOnlyUpdate, Status, TimestamplessUpdate};
90use mz_storage_client::controller::{IntrospectionType, MonotonicAppender, StorageWriteOp};
91use mz_storage_client::healthcheck::{
92    MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_METRICS_HISTORY_DESC,
93    WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC, WALLCLOCK_LAG_HISTORY_DESC,
94};
95use mz_storage_client::metrics::StorageControllerMetrics;
96use mz_storage_client::statistics::ControllerSinkStatistics;
97use mz_storage_client::storage_collections::StorageCollections;
98use mz_storage_types::StorageDiff;
99use mz_storage_types::controller::InvalidUpper;
100use mz_storage_types::dyncfgs::{
101    REPLICA_METRICS_HISTORY_RETENTION_INTERVAL, WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL,
102    WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL,
103};
104use mz_storage_types::parameters::{
105    STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT, StorageParameters,
106};
107use mz_storage_types::sources::SourceData;
108use timely::progress::{Antichain, Timestamp};
109use tokio::sync::{mpsc, oneshot, watch};
110use tokio::time::{Duration, Instant};
111use tracing::{debug, error, info};
112
113use crate::{
114    StatusHistoryDesc, StatusHistoryRetentionPolicy, StorageError, collection_mgmt,
115    privatelink_status_history_desc, replica_status_history_desc, sink_status_history_desc,
116    snapshot_statistics, source_status_history_desc, statistics,
117};
118
119// Default rate at which we advance the uppers of managed collections.
120const DEFAULT_TICK_MS: u64 = 1_000;
121
122/// A channel for sending writes to a differential collection.
123type DifferentialWriteChannel<T> =
124    mpsc::UnboundedSender<(StorageWriteOp, oneshot::Sender<Result<(), StorageError<T>>>)>;
125
126/// A channel for sending writes to an append-only collection.
127type AppendOnlyWriteChannel<T> = mpsc::UnboundedSender<(
128    Vec<AppendOnlyUpdate>,
129    oneshot::Sender<Result<(), StorageError<T>>>,
130)>;
131
132type WriteTask = AbortOnDropHandle<()>;
133type ShutdownSender = oneshot::Sender<()>;
134
135/// Types of storage-managed/introspection collections:
136///
137/// Append-only: Only accepts blind writes, writes that can be applied at any
138/// timestamp and don’t depend on current collection contents.
139///
140/// Pseudo append-only: We treat them largely as append-only collections but
141/// periodically (currently on bootstrap) retract old updates from them.
142///
143/// Differential: at any given time `t` , collection contents mirrors some
144/// (small cardinality) state. The cardinality of the collection stays constant
145/// if the thing that is mirrored doesn’t change in cardinality. At steady
146/// state, updates always come in pairs of retractions/additions.
147pub enum CollectionManagerKind {
148    AppendOnly,
149    Differential,
150}
151
152#[derive(Debug, Clone)]
153pub struct CollectionManager<T>
154where
155    T: Timestamp + Lattice + Codec64 + TimestampManipulation,
156{
157    /// When a [`CollectionManager`] is in read-only mode it must not affect any
158    /// changes to external state.
159    read_only: bool,
160
161    // WIP: Name TBD! I thought about `managed_collections`, `ivm_collections`,
162    // `self_correcting_collections`.
163    /// These are collections that we write to by adding/removing updates to an
164    /// internal _desired_ collection. The `CollectionManager` continually makes
165    /// sure that collection contents (in persist) match the desired state.
166    differential_collections:
167        Arc<Mutex<BTreeMap<GlobalId, (DifferentialWriteChannel<T>, WriteTask, ShutdownSender)>>>,
168
169    /// Collections that we only append to using blind-writes.
170    ///
171    /// Every write succeeds at _some_ timestamp, and we never check what the
172    /// actual contents of the collection (in persist) are.
173    append_only_collections:
174        Arc<Mutex<BTreeMap<GlobalId, (AppendOnlyWriteChannel<T>, WriteTask, ShutdownSender)>>>,
175
176    /// Amount of time we'll wait before sending a batch of inserts to Persist, for user
177    /// collections.
178    user_batch_duration_ms: Arc<AtomicU64>,
179    now: NowFn,
180}
181
182/// The `CollectionManager` provides two complementary functions:
183/// - Providing an API to append values to a registered set of collections.
184///   For this usecase:
185///     - The `CollectionManager` expects to be the only writer.
186///     - Appending to a closed collection panics
187/// - Automatically advancing the timestamp of managed collections every
188///   second. For this usecase:
189///     - The `CollectionManager` handles contention by permitting and ignoring errors.
190///     - Closed collections will not panic if they continue receiving these requests.
191impl<T> CollectionManager<T>
192where
193    T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
194{
195    pub(super) fn new(read_only: bool, now: NowFn) -> CollectionManager<T> {
196        let batch_duration_ms: u64 = STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT
197            .as_millis()
198            .try_into()
199            .expect("known to fit");
200
201        CollectionManager {
202            read_only,
203            differential_collections: Arc::new(Mutex::new(BTreeMap::new())),
204            append_only_collections: Arc::new(Mutex::new(BTreeMap::new())),
205            user_batch_duration_ms: Arc::new(AtomicU64::new(batch_duration_ms)),
206            now,
207        }
208    }
209
210    /// Updates the duration we'll wait to batch events for user owned collections.
211    pub fn update_user_batch_duration(&self, duration: Duration) {
212        tracing::info!(?duration, "updating user batch duration");
213        let millis: u64 = duration.as_millis().try_into().unwrap_or(u64::MAX);
214        self.user_batch_duration_ms.store(millis, Ordering::Relaxed);
215    }
216
217    /// Registers a new _differential collection_.
218    ///
219    /// The [CollectionManager] will automatically advance the upper of every
220    /// registered collection every second.
221    ///
222    /// Update the `desired` state of a differential collection using
223    /// [Self::differential_write].
224    pub(super) fn register_differential_collection<R>(
225        &self,
226        id: GlobalId,
227        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
228        read_handle_fn: R,
229        force_writable: bool,
230        introspection_config: DifferentialIntrospectionConfig<T>,
231    ) where
232        R: FnMut() -> Pin<
233                Box<dyn Future<Output = ReadHandle<SourceData, (), T, StorageDiff>> + Send>,
234            > + Send
235            + Sync
236            + 'static,
237    {
238        let mut guard = self
239            .differential_collections
240            .lock()
241            .expect("collection_mgmt panicked");
242
243        // Check if this collection is already registered.
244        if let Some((_writer, task, _shutdown_tx)) = guard.get(&id) {
245            // The collection is already registered and the task is still running so nothing to do.
246            if !task.is_finished() {
247                // TODO(parkmycar): Panic here if we never see this error in production.
248                tracing::error!("Registered a collection twice! {id:?}");
249                return;
250            }
251        }
252
253        let read_only = self.get_read_only(id, force_writable);
254
255        // Spawns a new task so we can write to this collection.
256        let writer_and_handle = DifferentialWriteTask::spawn(
257            id,
258            write_handle,
259            read_handle_fn,
260            read_only,
261            self.now.clone(),
262            introspection_config,
263        );
264        let prev = guard.insert(id, writer_and_handle);
265
266        // Double check the previous task was actually finished.
267        if let Some((_, prev_task, _)) = prev {
268            assert!(
269                prev_task.is_finished(),
270                "should only spawn a new task if the previous is finished"
271            );
272        }
273    }
274
275    /// Registers a new _append-only collection_.
276    ///
277    /// The [CollectionManager] will automatically advance the upper of every
278    /// registered collection every second.
279    pub(super) fn register_append_only_collection(
280        &self,
281        id: GlobalId,
282        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
283        force_writable: bool,
284        introspection_config: Option<AppendOnlyIntrospectionConfig<T>>,
285    ) {
286        let mut guard = self
287            .append_only_collections
288            .lock()
289            .expect("collection_mgmt panicked");
290
291        // Check if this collection is already registered.
292        if let Some((_writer, task, _shutdown_tx)) = guard.get(&id) {
293            // The collection is already registered and the task is still running so nothing to do.
294            if !task.is_finished() {
295                // TODO(parkmycar): Panic here if we never see this error in production.
296                tracing::error!("Registered a collection twice! {id:?}");
297                return;
298            }
299        }
300
301        let read_only = self.get_read_only(id, force_writable);
302
303        // Spawns a new task so we can write to this collection.
304        let writer_and_handle = AppendOnlyWriteTask::spawn(
305            id,
306            write_handle,
307            read_only,
308            self.now.clone(),
309            Arc::clone(&self.user_batch_duration_ms),
310            introspection_config,
311        );
312        let prev = guard.insert(id, writer_and_handle);
313
314        // Double check the previous task was actually finished.
315        if let Some((_, prev_task, _)) = prev {
316            assert!(
317                prev_task.is_finished(),
318                "should only spawn a new task if the previous is finished"
319            );
320        }
321    }
322
323    /// Unregisters the given collection.
324    ///
325    /// Also waits until the `CollectionManager` has completed all outstanding work to ensure that
326    /// it has stopped referencing the provided `id`.
327    #[mz_ore::instrument(level = "debug")]
328    pub(super) fn unregister_collection(&self, id: GlobalId) -> BoxFuture<'static, ()> {
329        let prev = self
330            .differential_collections
331            .lock()
332            .expect("CollectionManager panicked")
333            .remove(&id);
334
335        // Wait for the task to complete before reporting as unregisted.
336        if let Some((_prev_writer, prev_task, shutdown_tx)) = prev {
337            // Notify the task it needs to shutdown.
338            //
339            // We can ignore errors here because they indicate the task is already done.
340            let _ = shutdown_tx.send(());
341            return Box::pin(prev_task.map(|_| ()));
342        }
343
344        let prev = self
345            .append_only_collections
346            .lock()
347            .expect("CollectionManager panicked")
348            .remove(&id);
349
350        // Wait for the task to complete before reporting as unregisted.
351        if let Some((_prev_writer, prev_task, shutdown_tx)) = prev {
352            // Notify the task it needs to shutdown.
353            //
354            // We can ignore errors here because they indicate the task is already done.
355            let _ = shutdown_tx.send(());
356            return Box::pin(prev_task.map(|_| ()));
357        }
358
359        Box::pin(futures::future::ready(()))
360    }
361
362    /// Returns a sender for writes to the given append-only collection.
363    ///
364    /// # Panics
365    /// - If `id` does not belong to an append-only collections.
366    pub(super) fn append_only_write_sender(&self, id: GlobalId) -> AppendOnlyWriteChannel<T> {
367        let collections = self.append_only_collections.lock().expect("poisoned");
368        match collections.get(&id) {
369            Some((tx, _, _)) => tx.clone(),
370            None => panic!("missing append-only collection: {id}"),
371        }
372    }
373
374    /// Returns a sender for writes to the given differential collection.
375    ///
376    /// # Panics
377    /// - If `id` does not belong to a differential collections.
378    pub(super) fn differential_write_sender(&self, id: GlobalId) -> DifferentialWriteChannel<T> {
379        let collections = self.differential_collections.lock().expect("poisoned");
380        match collections.get(&id) {
381            Some((tx, _, _)) => tx.clone(),
382            None => panic!("missing differential collection: {id}"),
383        }
384    }
385
386    /// Appends `updates` to the append-only collection identified by `id`, at
387    /// _some_ timestamp. Does not wait for the append to complete.
388    ///
389    /// # Panics
390    /// - If `id` does not belong to an append-only collections.
391    /// - If this [`CollectionManager`] is in read-only mode.
392    /// - If the collection closed.
393    pub(super) fn blind_write(&self, id: GlobalId, updates: Vec<AppendOnlyUpdate>) {
394        if self.read_only {
395            panic!("attempting blind write to {} while in read-only mode", id);
396        }
397
398        if updates.is_empty() {
399            return;
400        }
401
402        let collections = self.append_only_collections.lock().expect("poisoned");
403        match collections.get(&id) {
404            Some((update_tx, _, _)) => {
405                let (tx, _rx) = oneshot::channel();
406                update_tx.send((updates, tx)).expect("rx hung up");
407            }
408            None => panic!("missing append-only collection: {id}"),
409        }
410    }
411
412    /// Updates the desired collection state of the differential collection identified by
413    /// `id`. The underlying persist shard will reflect this change at
414    /// _some_point. Does not wait for the change to complete.
415    ///
416    /// # Panics
417    /// - If `id` does not belong to a differential collection.
418    /// - If the collection closed.
419    pub(super) fn differential_write(&self, id: GlobalId, op: StorageWriteOp) {
420        if op.is_empty_append() {
421            return;
422        }
423
424        let collections = self.differential_collections.lock().expect("poisoned");
425        match collections.get(&id) {
426            Some((update_tx, _, _)) => {
427                let (tx, _rx) = oneshot::channel();
428                update_tx.send((op, tx)).expect("rx hung up");
429            }
430            None => panic!("missing differential collection: {id}"),
431        }
432    }
433
434    /// Appends the given `updates` to the differential collection identified by `id`.
435    ///
436    /// # Panics
437    /// - If `id` does not belong to a differential collection.
438    /// - If the collection closed.
439    pub(super) fn differential_append(&self, id: GlobalId, updates: Vec<(Row, Diff)>) {
440        self.differential_write(id, StorageWriteOp::Append { updates })
441    }
442
443    /// Returns a [`MonotonicAppender`] that can be used to monotonically append updates to the
444    /// collection correlated with `id`.
445    pub(super) fn monotonic_appender(
446        &self,
447        id: GlobalId,
448    ) -> Result<MonotonicAppender<T>, StorageError<T>> {
449        let guard = self
450            .append_only_collections
451            .lock()
452            .expect("CollectionManager panicked");
453        let tx = guard
454            .get(&id)
455            .map(|(tx, _, _)| tx.clone())
456            .ok_or(StorageError::IdentifierMissing(id))?;
457
458        Ok(MonotonicAppender::new(tx))
459    }
460
461    fn get_read_only(&self, id: GlobalId, force_writable: bool) -> bool {
462        if force_writable {
463            assert!(id.is_system(), "unexpected non-system global id: {id:?}");
464            false
465        } else {
466            self.read_only
467        }
468    }
469}
470
471pub(crate) struct DifferentialIntrospectionConfig<T>
472where
473    T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
474{
475    pub(crate) recent_upper: Antichain<T>,
476    pub(crate) introspection_type: IntrospectionType,
477    pub(crate) storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
478    pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
479    pub(crate) source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
480    pub(crate) sink_statistics:
481        Arc<Mutex<BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSinkStatistics>>>,
482    pub(crate) statistics_interval: Duration,
483    pub(crate) statistics_interval_receiver: watch::Receiver<Duration>,
484    pub(crate) statistics_retention_duration: Duration,
485    pub(crate) metrics: StorageControllerMetrics,
486    pub(crate) introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
487}
488
489/// A task that will make it so that the state in persist matches the desired
490/// state and continuously bump the upper for the specified collection.
491///
492/// NOTE: This implementation is a bit clunky, and could be optimized by not keeping
493/// all of desired in memory (see commend below). It is meant to showcase the
494/// general approach.
495struct DifferentialWriteTask<T, R>
496where
497    T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
498    R: FnMut() -> Pin<Box<dyn Future<Output = ReadHandle<SourceData, (), T, StorageDiff>> + Send>>
499        + Send
500        + 'static,
501{
502    /// The collection that we are writing to.
503    id: GlobalId,
504
505    write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
506
507    /// For getting a [`ReadHandle`] to sync our state to persist contents.
508    read_handle_fn: R,
509
510    read_only: bool,
511
512    now: NowFn,
513
514    /// In the absence of updates, we regularly bump the upper to "now", on this
515    /// interval. This makes it so the collection remains readable at recent
516    /// timestamps.
517    upper_tick_interval: tokio::time::Interval,
518
519    /// Receiver for write commands. These change our desired state.
520    cmd_rx: mpsc::UnboundedReceiver<(StorageWriteOp, oneshot::Sender<Result<(), StorageError<T>>>)>,
521
522    /// We have to shut down when receiving from this.
523    shutdown_rx: oneshot::Receiver<()>,
524
525    /// The contents of the collection as it should be according to whoever is
526    /// driving us around.
527    // This is memory inefficient: we always keep a full copy of
528    // desired, so that we can re-derive a to_write if/when someone else
529    // writes to persist and we notice because of an upper conflict.
530    // This is optimized for the case where we rarely have more than one
531    // writer.
532    //
533    // We can optimize for a multi-writer case by keeping an open
534    // ReadHandle and continually reading updates from persist, updating
535    // a desired in place. Similar to the self-correcting persist_sink.
536    desired: Vec<(Row, Diff)>,
537
538    /// Updates that we have to write when next writing to persist. This is
539    /// determined by looking at what is desired and what is in persist.
540    to_write: Vec<(Row, Diff)>,
541
542    /// Current upper of the persist shard. We keep track of this so that we
543    /// realize when someone else writes to the shard, in which case we have to
544    /// update our state of the world, that is update our `to_write` based on
545    /// `desired` and the contents of the persist shard.
546    current_upper: T,
547}
548
549impl<T, R> DifferentialWriteTask<T, R>
550where
551    T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
552    R: FnMut() -> Pin<Box<dyn Future<Output = ReadHandle<SourceData, (), T, StorageDiff>> + Send>>
553        + Send
554        + Sync
555        + 'static,
556{
557    /// Spawns a [`DifferentialWriteTask`] in an [`mz_ore::task`] and returns
558    /// handles for interacting with it.
559    fn spawn(
560        id: GlobalId,
561        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
562        read_handle_fn: R,
563        read_only: bool,
564        now: NowFn,
565        introspection_config: DifferentialIntrospectionConfig<T>,
566    ) -> (DifferentialWriteChannel<T>, WriteTask, ShutdownSender) {
567        let (tx, rx) = mpsc::unbounded_channel();
568        let (shutdown_tx, shutdown_rx) = oneshot::channel();
569
570        let upper_tick_interval = tokio::time::interval(Duration::from_millis(DEFAULT_TICK_MS));
571
572        let current_upper = T::minimum();
573
574        let task = Self {
575            id,
576            write_handle,
577            read_handle_fn,
578            read_only,
579            now,
580            upper_tick_interval,
581            cmd_rx: rx,
582            shutdown_rx,
583            desired: Vec::new(),
584            to_write: Vec::new(),
585            current_upper,
586        };
587
588        let handle = mz_ore::task::spawn(
589            || format!("CollectionManager-differential_write_task-{id}"),
590            async move {
591                if !task.read_only {
592                    task.prepare(introspection_config).await;
593                }
594                let res = task.run().await;
595
596                match res {
597                    ControlFlow::Break(reason) => {
598                        info!("write_task-{} ending: {}", id, reason);
599                    }
600                    c => {
601                        unreachable!(
602                            "cannot break out of the loop with a Continue, but got: {:?}",
603                            c
604                        );
605                    }
606                }
607            },
608        );
609
610        (tx, handle.abort_on_drop(), shutdown_tx)
611    }
612
613    /// Does any work that is required before this background task starts
614    /// writing to the given introspection collection.
615    ///
616    /// This might include consolidation, deleting older entries or seeding
617    /// in-memory state of, say, scrapers, with current collection contents.
618    async fn prepare(&self, introspection_config: DifferentialIntrospectionConfig<T>) {
619        tracing::info!(%self.id, ?introspection_config.introspection_type, "preparing differential introspection collection for writes");
620
621        match introspection_config.introspection_type {
622            IntrospectionType::ShardMapping => {
623                // Done by the `append_shard_mappings` call.
624            }
625            IntrospectionType::Frontiers | IntrospectionType::ReplicaFrontiers => {
626                // Differential collections start with an empty
627                // desired state. No need to manually reset.
628            }
629            IntrospectionType::StorageSourceStatistics => {
630                let prev = snapshot_statistics(
631                    self.id,
632                    introspection_config.recent_upper,
633                    &introspection_config.storage_collections,
634                )
635                .await;
636
637                let scraper_token = statistics::spawn_statistics_scraper(
638                    self.id.clone(),
639                    // These do a shallow copy.
640                    introspection_config.collection_manager,
641                    Arc::clone(&introspection_config.source_statistics),
642                    prev,
643                    introspection_config.statistics_interval.clone(),
644                    introspection_config.statistics_interval_receiver.clone(),
645                    introspection_config.statistics_retention_duration,
646                    introspection_config.metrics,
647                );
648                let web_token = statistics::spawn_webhook_statistics_scraper(
649                    introspection_config.source_statistics,
650                    introspection_config.statistics_interval,
651                    introspection_config.statistics_interval_receiver,
652                );
653
654                // Make sure these are dropped when the controller is
655                // dropped, so that the internal task will stop.
656                introspection_config
657                    .introspection_tokens
658                    .lock()
659                    .expect("poisoned")
660                    .insert(self.id, Box::new((scraper_token, web_token)));
661            }
662            IntrospectionType::StorageSinkStatistics => {
663                let prev = snapshot_statistics(
664                    self.id,
665                    introspection_config.recent_upper,
666                    &introspection_config.storage_collections,
667                )
668                .await;
669
670                let scraper_token = statistics::spawn_statistics_scraper(
671                    self.id.clone(),
672                    introspection_config.collection_manager,
673                    Arc::clone(&introspection_config.sink_statistics),
674                    prev,
675                    introspection_config.statistics_interval,
676                    introspection_config.statistics_interval_receiver,
677                    introspection_config.statistics_retention_duration,
678                    introspection_config.metrics,
679                );
680
681                // Make sure this is dropped when the controller is
682                // dropped, so that the internal task will stop.
683                introspection_config
684                    .introspection_tokens
685                    .lock()
686                    .expect("poisoned")
687                    .insert(self.id, scraper_token);
688            }
689
690            IntrospectionType::ComputeDependencies
691            | IntrospectionType::ComputeOperatorHydrationStatus
692            | IntrospectionType::ComputeMaterializedViewRefreshes
693            | IntrospectionType::ComputeErrorCounts
694            | IntrospectionType::ComputeHydrationTimes => {
695                // Differential collections start with an empty
696                // desired state. No need to manually reset.
697            }
698
699            introspection_type @ IntrospectionType::ReplicaMetricsHistory
700            | introspection_type @ IntrospectionType::WallclockLagHistory
701            | introspection_type @ IntrospectionType::WallclockLagHistogram
702            | introspection_type @ IntrospectionType::PreparedStatementHistory
703            | introspection_type @ IntrospectionType::StatementExecutionHistory
704            | introspection_type @ IntrospectionType::SessionHistory
705            | introspection_type @ IntrospectionType::StatementLifecycleHistory
706            | introspection_type @ IntrospectionType::SqlText
707            | introspection_type @ IntrospectionType::SourceStatusHistory
708            | introspection_type @ IntrospectionType::SinkStatusHistory
709            | introspection_type @ IntrospectionType::PrivatelinkConnectionStatusHistory
710            | introspection_type @ IntrospectionType::ReplicaStatusHistory => {
711                unreachable!("not differential collection: {introspection_type:?}")
712            }
713        }
714    }
715
716    async fn run(mut self) -> ControlFlow<String> {
717        const BATCH_SIZE: usize = 4096;
718        let mut updates = Vec::with_capacity(BATCH_SIZE);
719        loop {
720            tokio::select! {
721                // Prefer sending actual updates over just bumping the upper,
722                // because sending updates also bump the upper.
723                biased;
724
725                // Listen for a shutdown signal so we can gracefully cleanup.
726                _ = &mut self.shutdown_rx => {
727                    self.handle_shutdown();
728
729                    return ControlFlow::Break("graceful shutdown".to_string());
730                }
731
732                // Pull a chunk of queued updates off the channel.
733                count = self.cmd_rx.recv_many(&mut updates, BATCH_SIZE) => {
734                    if count > 0 {
735                        let _ = self.handle_updates(&mut updates).await?;
736                    } else {
737                        // Sender has been dropped, which means the collection
738                        // should have been unregistered, break out of the run
739                        // loop if we weren't already aborted.
740                        return ControlFlow::Break("sender has been dropped".to_string());
741                    }
742                }
743
744                // If we haven't received any updates, then we'll move the upper forward.
745                _ = self.upper_tick_interval.tick() => {
746                    if self.read_only {
747                        // Not bumping uppers while in read-only mode.
748                        continue;
749                    }
750                    let _ = self.tick_upper().await?;
751                },
752            }
753        }
754    }
755
756    async fn tick_upper(&mut self) -> ControlFlow<String> {
757        let now = T::from((self.now)());
758
759        if now <= self.current_upper {
760            // Upper is already further along than current wall-clock time, no
761            // need to bump it.
762            return ControlFlow::Continue(());
763        }
764
765        assert!(!self.read_only);
766        let res = self
767            .write_handle
768            .compare_and_append_batch(
769                &mut [],
770                Antichain::from_elem(self.current_upper.clone()),
771                Antichain::from_elem(now.clone()),
772                true,
773            )
774            .await
775            .expect("valid usage");
776        match res {
777            // All good!
778            Ok(()) => {
779                tracing::debug!(%self.id, "bumped upper of differential collection");
780                self.current_upper = now;
781            }
782            Err(err) => {
783                // Someone else wrote to the collection or bumped the upper. We
784                // need to sync to latest persist state and potentially patch up
785                // our `to_write`, based on what we learn and `desired`.
786
787                let actual_upper = if let Some(ts) = err.current.as_option() {
788                    ts.clone()
789                } else {
790                    return ControlFlow::Break("upper is the empty antichain".to_string());
791                };
792
793                tracing::info!(%self.id, ?actual_upper, expected_upper = ?self.current_upper, "upper mismatch while bumping upper, syncing to persist state");
794
795                self.current_upper = actual_upper;
796
797                self.sync_to_persist().await;
798            }
799        }
800
801        ControlFlow::Continue(())
802    }
803
804    fn handle_shutdown(&mut self) {
805        let mut senders = Vec::new();
806
807        // Prevent new messages from being sent.
808        self.cmd_rx.close();
809
810        // Get as many waiting senders as possible.
811        while let Ok((_batch, sender)) = self.cmd_rx.try_recv() {
812            senders.push(sender);
813        }
814
815        // Notify them that this collection is closed.
816        //
817        // Note: if a task is shutting down, that indicates the source has been
818        // dropped, at which point the identifier is invalid. Returning this
819        // error provides a better user experience.
820        notify_listeners(senders, || Err(StorageError::IdentifierInvalid(self.id)));
821    }
822
823    async fn handle_updates(
824        &mut self,
825        batch: &mut Vec<(StorageWriteOp, oneshot::Sender<Result<(), StorageError<T>>>)>,
826    ) -> ControlFlow<String> {
827        // Put in place _some_ rate limiting.
828        let batch_duration_ms = STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT;
829
830        let use_batch_now = Instant::now();
831        let min_time_to_complete = use_batch_now + batch_duration_ms;
832
833        tracing::debug!(
834            ?use_batch_now,
835            ?batch_duration_ms,
836            ?min_time_to_complete,
837            "batch duration",
838        );
839
840        let mut responders = Vec::with_capacity(batch.len());
841        for (op, tx) in batch.drain(..) {
842            self.apply_write_op(op);
843            responders.push(tx);
844        }
845
846        // TODO: Maybe don't do it every time?
847        consolidation::consolidate(&mut self.desired);
848        consolidation::consolidate(&mut self.to_write);
849
850        // Reset the interval which is used to periodically bump the uppers
851        // because the uppers will get bumped with the following update.
852        // This makes it such that we will write at most once every
853        // `interval`.
854        //
855        // For example, let's say our `DEFAULT_TICK` interval is 10, so at
856        // `t + 10`, `t + 20`, ... we'll bump the uppers. If we receive an
857        // update at `t + 3` we want to shift this window so we bump the
858        // uppers at `t + 13`, `t + 23`, ... which resetting the interval
859        // accomplishes.
860        self.upper_tick_interval.reset();
861
862        self.write_to_persist(responders).await?;
863
864        // Wait until our artificial latency has completed.
865        //
866        // Note: if writing to persist took longer than `DEFAULT_TICK` this
867        // await will resolve immediately.
868        tokio::time::sleep_until(min_time_to_complete).await;
869
870        ControlFlow::Continue(())
871    }
872
873    /// Apply the given write operation to the `desired`/`to_write` state.
874    fn apply_write_op(&mut self, op: StorageWriteOp) {
875        match op {
876            StorageWriteOp::Append { updates } => {
877                self.desired.extend_from_slice(&updates);
878                self.to_write.extend(updates);
879            }
880            StorageWriteOp::Delete { filter } => {
881                let to_delete = self.desired.extract_if(.., |(row, _)| filter(row));
882                let retractions = to_delete.map(|(row, diff)| (row, -diff));
883                self.to_write.extend(retractions);
884            }
885        }
886    }
887
888    /// Attempt to write what is currently in [Self::to_write] to persist,
889    /// retrying and re-syncing to persist when necessary, that is when the
890    /// upper was not what we expected.
891    async fn write_to_persist(
892        &mut self,
893        responders: Vec<oneshot::Sender<Result<(), StorageError<T>>>>,
894    ) -> ControlFlow<String> {
895        if self.read_only {
896            tracing::debug!(%self.id, "not writing to differential collection: read-only");
897            // Not attempting to write while in read-only mode.
898            return ControlFlow::Continue(());
899        }
900
901        // We'll try really hard to succeed, but eventually stop.
902        //
903        // Note: it's very rare we should ever need to retry, and if we need to
904        // retry it should only take 1 or 2 attempts. We set `max_tries` to be
905        // high though because if we hit some edge case we want to try hard to
906        // commit the data.
907        let retries = Retry::default()
908            .initial_backoff(Duration::from_secs(1))
909            .clamp_backoff(Duration::from_secs(3))
910            .factor(1.25)
911            .max_tries(20)
912            .into_retry_stream();
913        let mut retries = Box::pin(retries);
914
915        loop {
916            // Append updates to persist!
917            let now = T::from((self.now)());
918            let new_upper = std::cmp::max(
919                now,
920                TimestampManipulation::step_forward(&self.current_upper),
921            );
922
923            let updates_to_write = self
924                .to_write
925                .iter()
926                .map(|(row, diff)| {
927                    (
928                        (SourceData(Ok(row.clone())), ()),
929                        self.current_upper.clone(),
930                        diff.into_inner(),
931                    )
932                })
933                .collect::<Vec<_>>();
934
935            assert!(!self.read_only);
936            let res = self
937                .write_handle
938                .compare_and_append(
939                    updates_to_write,
940                    Antichain::from_elem(self.current_upper.clone()),
941                    Antichain::from_elem(new_upper.clone()),
942                )
943                .await
944                .expect("valid usage");
945            match res {
946                // Everything was successful!
947                Ok(()) => {
948                    // Notify all of our listeners.
949                    notify_listeners(responders, || Ok(()));
950
951                    self.current_upper = new_upper;
952
953                    // Very important! This is empty at steady state, while
954                    // desired keeps an in-memory copy of desired state.
955                    self.to_write.clear();
956
957                    tracing::debug!(%self.id, "appended to differential collection");
958
959                    // Break out of the retry loop so we can wait for more data.
960                    break;
961                }
962                // Failed to write to some collections,
963                Err(err) => {
964                    // Someone else wrote to the collection. We need to read
965                    // from persist and update to_write based on that and the
966                    // desired state.
967                    let actual_upper = if let Some(ts) = err.current.as_option() {
968                        ts.clone()
969                    } else {
970                        return ControlFlow::Break("upper is the empty antichain".to_string());
971                    };
972
973                    tracing::info!(%self.id, ?actual_upper, expected_upper = ?self.current_upper, "retrying append for differential collection");
974
975                    // We've exhausted all of our retries, notify listeners and
976                    // break out of the retry loop so we can wait for more data.
977                    if retries.next().await.is_none() {
978                        let invalid_upper = InvalidUpper {
979                            id: self.id,
980                            current_upper: err.current,
981                        };
982                        notify_listeners(responders, || {
983                            Err(StorageError::InvalidUppers(vec![invalid_upper.clone()]))
984                        });
985                        error!(
986                            "exhausted retries when appending to managed collection {}",
987                            self.id
988                        );
989                        break;
990                    }
991
992                    self.current_upper = actual_upper;
993
994                    self.sync_to_persist().await;
995
996                    debug!(
997                        "Retrying invalid-uppers error while appending to differential collection {}",
998                        self.id
999                    );
1000                }
1001            }
1002        }
1003
1004        ControlFlow::Continue(())
1005    }
1006
1007    /// Re-derives [Self::to_write] by looking at [Self::desired] and the
1008    /// current state in persist. We want to insert everything in desired and
1009    /// retract everything in persist. But ideally most of that cancels out in
1010    /// consolidation.
1011    ///
1012    /// To be called when a `compare_and_append` failed because the upper didn't
1013    /// match what we expected.
1014    async fn sync_to_persist(&mut self) {
1015        let mut read_handle = (self.read_handle_fn)().await;
1016        let as_of = self
1017            .current_upper
1018            .step_back()
1019            .unwrap_or_else(|| T::minimum());
1020        let as_of = Antichain::from_elem(as_of);
1021        let snapshot = read_handle.snapshot_and_fetch(as_of).await;
1022
1023        let mut negated_oks = match snapshot {
1024            Ok(contents) => {
1025                let mut snapshot = Vec::with_capacity(contents.len());
1026                for ((data, _), _, diff) in contents {
1027                    let row = data.expect("invalid protobuf data").0.unwrap();
1028                    snapshot.push((row, -Diff::from(diff)));
1029                }
1030                snapshot
1031            }
1032            Err(_) => panic!("read before since"),
1033        };
1034
1035        self.to_write.clear();
1036        self.to_write.extend(self.desired.iter().cloned());
1037        self.to_write.append(&mut negated_oks);
1038        consolidation::consolidate(&mut self.to_write);
1039    }
1040}
1041
1042pub(crate) struct AppendOnlyIntrospectionConfig<T>
1043where
1044    T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
1045{
1046    pub(crate) introspection_type: IntrospectionType,
1047    pub(crate) config_set: Arc<ConfigSet>,
1048    pub(crate) parameters: StorageParameters,
1049    pub(crate) storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
1050}
1051
1052/// A task that writes to an append only collection and continuously bumps the upper for the specified
1053/// collection.
1054///
1055/// For status history collections, this task can deduplicate redundant [`Statuses`](Status).
1056struct AppendOnlyWriteTask<T>
1057where
1058    T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
1059{
1060    /// The collection that we are writing to.
1061    id: GlobalId,
1062    write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
1063    read_only: bool,
1064    now: NowFn,
1065    user_batch_duration_ms: Arc<AtomicU64>,
1066    /// Receiver for write commands.
1067    rx: mpsc::UnboundedReceiver<(
1068        Vec<AppendOnlyUpdate>,
1069        oneshot::Sender<Result<(), StorageError<T>>>,
1070    )>,
1071
1072    /// We have to shut down when receiving from this.
1073    shutdown_rx: oneshot::Receiver<()>,
1074    /// If this collection deduplicates statuses, this map is used to track the previous status.
1075    previous_statuses: Option<BTreeMap<(GlobalId, Option<ReplicaId>), Status>>,
1076}
1077
1078impl<T> AppendOnlyWriteTask<T>
1079where
1080    T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
1081{
1082    /// Spawns an [`AppendOnlyWriteTask`] in an [`mz_ore::task`] that will continuously bump the
1083    /// upper for the specified collection,
1084    /// and append data that is sent via the provided [`mpsc::UnboundedSender`].
1085    ///
1086    /// TODO(parkmycar): One day if we want to customize the tick interval for each collection, that
1087    /// should be done here.
1088    /// TODO(parkmycar): Maybe add prometheus metrics for each collection?
1089    fn spawn(
1090        id: GlobalId,
1091        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
1092        read_only: bool,
1093        now: NowFn,
1094        user_batch_duration_ms: Arc<AtomicU64>,
1095        introspection_config: Option<AppendOnlyIntrospectionConfig<T>>,
1096    ) -> (AppendOnlyWriteChannel<T>, WriteTask, ShutdownSender) {
1097        let (tx, rx) = mpsc::unbounded_channel();
1098        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1099
1100        let previous_statuses: Option<BTreeMap<(GlobalId, Option<ReplicaId>), Status>> =
1101            match introspection_config
1102                .as_ref()
1103                .map(|config| config.introspection_type)
1104            {
1105                Some(IntrospectionType::SourceStatusHistory)
1106                | Some(IntrospectionType::SinkStatusHistory) => Some(BTreeMap::new()),
1107
1108                Some(IntrospectionType::ReplicaMetricsHistory)
1109                | Some(IntrospectionType::WallclockLagHistory)
1110                | Some(IntrospectionType::WallclockLagHistogram)
1111                | Some(IntrospectionType::PrivatelinkConnectionStatusHistory)
1112                | Some(IntrospectionType::ReplicaStatusHistory)
1113                | Some(IntrospectionType::PreparedStatementHistory)
1114                | Some(IntrospectionType::StatementExecutionHistory)
1115                | Some(IntrospectionType::SessionHistory)
1116                | Some(IntrospectionType::StatementLifecycleHistory)
1117                | Some(IntrospectionType::SqlText)
1118                | None => None,
1119
1120                Some(introspection_type @ IntrospectionType::ShardMapping)
1121                | Some(introspection_type @ IntrospectionType::Frontiers)
1122                | Some(introspection_type @ IntrospectionType::ReplicaFrontiers)
1123                | Some(introspection_type @ IntrospectionType::StorageSourceStatistics)
1124                | Some(introspection_type @ IntrospectionType::StorageSinkStatistics)
1125                | Some(introspection_type @ IntrospectionType::ComputeDependencies)
1126                | Some(introspection_type @ IntrospectionType::ComputeOperatorHydrationStatus)
1127                | Some(introspection_type @ IntrospectionType::ComputeMaterializedViewRefreshes)
1128                | Some(introspection_type @ IntrospectionType::ComputeErrorCounts)
1129                | Some(introspection_type @ IntrospectionType::ComputeHydrationTimes) => {
1130                    unreachable!("not append-only collection: {introspection_type:?}")
1131                }
1132            };
1133
1134        let mut task = Self {
1135            id,
1136            write_handle,
1137            rx,
1138            shutdown_rx,
1139            read_only,
1140            now,
1141            user_batch_duration_ms,
1142            previous_statuses,
1143        };
1144
1145        let handle = mz_ore::task::spawn(
1146            || format!("CollectionManager-append_only_write_task-{id}"),
1147            async move {
1148                if !task.read_only {
1149                    task.prepare(introspection_config).await;
1150                }
1151                task.run().await;
1152            },
1153        );
1154
1155        (tx, handle.abort_on_drop(), shutdown_tx)
1156    }
1157
1158    /// Does any work that is required before the background task starts
1159    /// writing to the given append only introspection collection.
1160    ///
1161    /// This might include consolidation or deleting older entries.
1162    async fn prepare(&mut self, introspection_config: Option<AppendOnlyIntrospectionConfig<T>>) {
1163        let Some(AppendOnlyIntrospectionConfig {
1164            introspection_type,
1165            config_set,
1166            parameters,
1167            storage_collections,
1168        }) = introspection_config
1169        else {
1170            return;
1171        };
1172        let initial_statuses = match introspection_type {
1173            IntrospectionType::ReplicaMetricsHistory
1174            | IntrospectionType::WallclockLagHistory
1175            | IntrospectionType::WallclockLagHistogram => {
1176                let result = partially_truncate_metrics_history(
1177                    self.id,
1178                    introspection_type,
1179                    &mut self.write_handle,
1180                    config_set,
1181                    self.now.clone(),
1182                    storage_collections,
1183                )
1184                .await;
1185                if let Err(error) = result {
1186                    soft_panic_or_log!(
1187                        "error truncating metrics history: {error} (type={introspection_type:?})"
1188                    );
1189                }
1190                Vec::new()
1191            }
1192
1193            IntrospectionType::PrivatelinkConnectionStatusHistory => {
1194                partially_truncate_status_history(
1195                    self.id,
1196                    IntrospectionType::PrivatelinkConnectionStatusHistory,
1197                    &mut self.write_handle,
1198                    privatelink_status_history_desc(&parameters),
1199                    self.now.clone(),
1200                    &storage_collections,
1201                )
1202                .await;
1203                Vec::new()
1204            }
1205            IntrospectionType::ReplicaStatusHistory => {
1206                partially_truncate_status_history(
1207                    self.id,
1208                    IntrospectionType::ReplicaStatusHistory,
1209                    &mut self.write_handle,
1210                    replica_status_history_desc(&parameters),
1211                    self.now.clone(),
1212                    &storage_collections,
1213                )
1214                .await;
1215                Vec::new()
1216            }
1217
1218            // Note [btv] - we don't truncate these, because that uses
1219            // a huge amount of memory on environmentd startup.
1220            IntrospectionType::PreparedStatementHistory
1221            | IntrospectionType::StatementExecutionHistory
1222            | IntrospectionType::SessionHistory
1223            | IntrospectionType::StatementLifecycleHistory
1224            | IntrospectionType::SqlText => {
1225                // NOTE(aljoscha): We never remove from these
1226                // collections. Someone, at some point needs to
1227                // think about that! Issue:
1228                // https://github.com/MaterializeInc/database-issues/issues/7666
1229                Vec::new()
1230            }
1231
1232            IntrospectionType::SourceStatusHistory => {
1233                let last_status_per_id = partially_truncate_status_history(
1234                    self.id,
1235                    IntrospectionType::SourceStatusHistory,
1236                    &mut self.write_handle,
1237                    source_status_history_desc(&parameters),
1238                    self.now.clone(),
1239                    &storage_collections,
1240                )
1241                .await;
1242
1243                let status_col = MZ_SOURCE_STATUS_HISTORY_DESC
1244                    .get_by_name(&ColumnName::from("status"))
1245                    .expect("schema has not changed")
1246                    .0;
1247
1248                last_status_per_id
1249                    .into_iter()
1250                    .map(|(id, row)| {
1251                        (
1252                            id,
1253                            Status::from_str(
1254                                row.iter()
1255                                    .nth(status_col)
1256                                    .expect("schema has not changed")
1257                                    .unwrap_str(),
1258                            )
1259                            .expect("statuses must be uncorrupted"),
1260                        )
1261                    })
1262                    .collect()
1263            }
1264            IntrospectionType::SinkStatusHistory => {
1265                let last_status_per_id = partially_truncate_status_history(
1266                    self.id,
1267                    IntrospectionType::SinkStatusHistory,
1268                    &mut self.write_handle,
1269                    sink_status_history_desc(&parameters),
1270                    self.now.clone(),
1271                    &storage_collections,
1272                )
1273                .await;
1274
1275                let status_col = MZ_SINK_STATUS_HISTORY_DESC
1276                    .get_by_name(&ColumnName::from("status"))
1277                    .expect("schema has not changed")
1278                    .0;
1279
1280                last_status_per_id
1281                    .into_iter()
1282                    .map(|(id, row)| {
1283                        (
1284                            id,
1285                            Status::from_str(
1286                                row.iter()
1287                                    .nth(status_col)
1288                                    .expect("schema has not changed")
1289                                    .unwrap_str(),
1290                            )
1291                            .expect("statuses must be uncorrupted"),
1292                        )
1293                    })
1294                    .collect()
1295            }
1296
1297            introspection_type @ IntrospectionType::ShardMapping
1298            | introspection_type @ IntrospectionType::Frontiers
1299            | introspection_type @ IntrospectionType::ReplicaFrontiers
1300            | introspection_type @ IntrospectionType::StorageSourceStatistics
1301            | introspection_type @ IntrospectionType::StorageSinkStatistics
1302            | introspection_type @ IntrospectionType::ComputeDependencies
1303            | introspection_type @ IntrospectionType::ComputeOperatorHydrationStatus
1304            | introspection_type @ IntrospectionType::ComputeMaterializedViewRefreshes
1305            | introspection_type @ IntrospectionType::ComputeErrorCounts
1306            | introspection_type @ IntrospectionType::ComputeHydrationTimes => {
1307                unreachable!("not append-only collection: {introspection_type:?}")
1308            }
1309        };
1310        if let Some(previous_statuses) = &mut self.previous_statuses {
1311            previous_statuses.extend(initial_statuses);
1312        }
1313    }
1314
1315    async fn run(mut self) {
1316        let mut interval = tokio::time::interval(Duration::from_millis(DEFAULT_TICK_MS));
1317
1318        const BATCH_SIZE: usize = 4096;
1319        let mut batch: Vec<(Vec<_>, _)> = Vec::with_capacity(BATCH_SIZE);
1320
1321        'run: loop {
1322            tokio::select! {
1323                // Prefer sending actual updates over just bumping the upper, because sending
1324                // updates also bump the upper.
1325                biased;
1326
1327                // Listen for a shutdown signal so we can gracefully cleanup.
1328                _ = &mut self.shutdown_rx => {
1329                    let mut senders = Vec::new();
1330
1331                    // Prevent new messages from being sent.
1332                    self.rx.close();
1333
1334                    // Get as many waiting senders as possible.
1335                    while let Ok((_batch, sender)) = self.rx.try_recv() {
1336                        senders.push(sender);
1337                    }
1338
1339                    // Notify them that this collection is closed.
1340                    //
1341                    // Note: if a task is shutting down, that indicates the source has been
1342                    // dropped, at which point the identifier is invalid. Returning this
1343                    // error provides a better user experience.
1344                    notify_listeners(senders, || Err(StorageError::IdentifierInvalid(self.id)));
1345
1346                    break 'run;
1347                }
1348
1349                // Pull a chunk of queued updates off the channel.
1350                count = self.rx.recv_many(&mut batch, BATCH_SIZE) => {
1351                    if count > 0 {
1352                        // To rate limit appends to persist we add artificial latency, and will
1353                        // finish no sooner than this instant.
1354                        let batch_duration_ms = match self.id {
1355                            GlobalId::User(_) => Duration::from_millis(self.user_batch_duration_ms.load(Ordering::Relaxed)),
1356                            // For non-user collections, always just use the default.
1357                            _ => STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT,
1358                        };
1359                        let use_batch_now = Instant::now();
1360                        let min_time_to_complete = use_batch_now + batch_duration_ms;
1361
1362                        tracing::debug!(
1363                            ?use_batch_now,
1364                            ?batch_duration_ms,
1365                            ?min_time_to_complete,
1366                            "batch duration",
1367                        );
1368
1369                        // Reset the interval which is used to periodically bump the uppers
1370                        // because the uppers will get bumped with the following update. This
1371                        // makes it such that we will write at most once every `interval`.
1372                        //
1373                        // For example, let's say our `DEFAULT_TICK` interval is 10, so at
1374                        // `t + 10`, `t + 20`, ... we'll bump the uppers. If we receive an
1375                        // update at `t + 3` we want to shift this window so we bump the uppers
1376                        // at `t + 13`, `t + 23`, ... which resetting the interval accomplishes.
1377                        interval.reset();
1378
1379
1380                        let mut all_rows = Vec::with_capacity(batch.iter().map(|(rows, _)| rows.len()).sum());
1381                        let mut responders = Vec::with_capacity(batch.len());
1382
1383                        for (updates, responder) in batch.drain(..) {
1384                            let rows = self.process_updates(updates);
1385
1386                            all_rows.extend(rows.map(|(row, diff)| TimestamplessUpdate { row, diff}));
1387                            responders.push(responder);
1388                        }
1389
1390                        if self.read_only {
1391                            tracing::warn!(%self.id, ?all_rows, "append while in read-only mode");
1392                            notify_listeners(responders, || Err(StorageError::ReadOnly));
1393                            continue;
1394                        }
1395
1396                        // Append updates to persist!
1397                        let at_least = T::from((self.now)());
1398
1399                        if !all_rows.is_empty() {
1400                            monotonic_append(&mut self.write_handle, all_rows, at_least).await;
1401                        }
1402                        // Notify all of our listeners.
1403                        notify_listeners(responders, || Ok(()));
1404
1405                        // Wait until our artificial latency has completed.
1406                        //
1407                        // Note: if writing to persist took longer than `DEFAULT_TICK` this
1408                        // await will resolve immediately.
1409                        tokio::time::sleep_until(min_time_to_complete).await;
1410                    } else {
1411                        // Sender has been dropped, which means the collection should have been
1412                        // unregistered, break out of the run loop if we weren't already
1413                        // aborted.
1414                        break 'run;
1415                    }
1416                }
1417
1418                // If we haven't received any updates, then we'll move the upper forward.
1419                _ = interval.tick() => {
1420                    if self.read_only {
1421                        // Not bumping uppers while in read-only mode.
1422                        continue;
1423                    }
1424
1425                    // Update our collection.
1426                    let now = T::from((self.now)());
1427                    let updates = vec![];
1428                    let at_least = now.clone();
1429
1430                    // Failures don't matter when advancing collections' uppers. This might
1431                    // fail when a clusterd happens to be writing to this concurrently.
1432                    // Advancing uppers here is best-effort and only needs to succeed if no
1433                    // one else is advancing it; contention proves otherwise.
1434                    monotonic_append(&mut self.write_handle, updates, at_least).await;
1435                },
1436            }
1437        }
1438
1439        info!("write_task-{} ending", self.id);
1440    }
1441
1442    /// Deduplicate any [`mz_storage_client::client::StatusUpdate`] within `updates` and converts
1443    /// `updates` to rows and diffs.
1444    fn process_updates(
1445        &mut self,
1446        updates: Vec<AppendOnlyUpdate>,
1447    ) -> impl Iterator<Item = (Row, Diff)> {
1448        let updates = if let Some(previous_statuses) = &mut self.previous_statuses {
1449            let new: Vec<_> = updates
1450                .into_iter()
1451                .filter(|r| match r {
1452                    AppendOnlyUpdate::Row(_) => true,
1453                    AppendOnlyUpdate::Status(update) => {
1454                        match (
1455                            previous_statuses
1456                                .get(&(update.id, update.replica_id))
1457                                .as_deref(),
1458                            &update.status,
1459                        ) {
1460                            (None, _) => true,
1461                            (Some(old), new) => old.superseded_by(*new),
1462                        }
1463                    }
1464                })
1465                .collect();
1466            previous_statuses.extend(new.iter().filter_map(|update| match update {
1467                AppendOnlyUpdate::Row(_) => None,
1468                AppendOnlyUpdate::Status(update) => {
1469                    Some(((update.id, update.replica_id), update.status))
1470                }
1471            }));
1472            new
1473        } else {
1474            updates
1475        };
1476
1477        updates.into_iter().map(AppendOnlyUpdate::into_row)
1478    }
1479}
1480
1481/// Truncates the given metrics history by removing all entries older than that history's
1482/// configured retention interval.
1483///
1484/// # Panics
1485///
1486/// Panics if `collection` is not a metrics history.
1487async fn partially_truncate_metrics_history<T>(
1488    id: GlobalId,
1489    introspection_type: IntrospectionType,
1490    write_handle: &mut WriteHandle<SourceData, (), T, StorageDiff>,
1491    config_set: Arc<ConfigSet>,
1492    now: NowFn,
1493    storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
1494) -> Result<(), anyhow::Error>
1495where
1496    T: Codec64 + From<EpochMillis> + TimestampManipulation,
1497{
1498    let (keep_duration, occurred_at_col) = match introspection_type {
1499        IntrospectionType::ReplicaMetricsHistory => (
1500            REPLICA_METRICS_HISTORY_RETENTION_INTERVAL.get(&config_set),
1501            REPLICA_METRICS_HISTORY_DESC
1502                .get_by_name(&ColumnName::from("occurred_at"))
1503                .expect("schema has not changed")
1504                .0,
1505        ),
1506        IntrospectionType::WallclockLagHistory => (
1507            WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL.get(&config_set),
1508            WALLCLOCK_LAG_HISTORY_DESC
1509                .get_by_name(&ColumnName::from("occurred_at"))
1510                .expect("schema has not changed")
1511                .0,
1512        ),
1513        IntrospectionType::WallclockLagHistogram => (
1514            WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL.get(&config_set),
1515            WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC
1516                .get_by_name(&ColumnName::from("period_start"))
1517                .expect("schema has not changed")
1518                .0,
1519        ),
1520        _ => panic!("not a metrics history: {introspection_type:?}"),
1521    };
1522
1523    let upper = write_handle.fetch_recent_upper().await;
1524    let Some(upper_ts) = upper.as_option() else {
1525        bail!("collection is sealed");
1526    };
1527    let Some(as_of_ts) = upper_ts.step_back() else {
1528        return Ok(()); // nothing to truncate
1529    };
1530
1531    let mut rows = storage_collections
1532        .snapshot_cursor(id, as_of_ts)
1533        .await
1534        .map_err(|e| anyhow!("reading snapshot: {e:?}"))?;
1535
1536    let now = mz_ore::now::to_datetime(now());
1537    let keep_since = now - keep_duration;
1538
1539    // It is very important that we append our retractions at the timestamp
1540    // right after the timestamp at which we got our snapshot. Otherwise,
1541    // it's possible for someone else to sneak in retractions or other
1542    // unexpected changes.
1543    let old_upper_ts = upper_ts.clone();
1544    let new_upper_ts = TimestampManipulation::step_forward(&old_upper_ts);
1545
1546    // Produce retractions by inverting diffs of rows we want to delete.
1547    let mut builder = write_handle.builder(Antichain::from_elem(old_upper_ts.clone()));
1548    while let Some(chunk) = rows.next().await {
1549        for ((key, _v), _t, diff) in chunk {
1550            let data = key.map_err(|e| anyhow!("decoding error in metrics snapshot: {e}"))?;
1551            let Ok(row) = &data.0 else { continue };
1552            let datums = row.unpack();
1553            let occurred_at = datums[occurred_at_col].unwrap_timestamptz();
1554            if *occurred_at >= keep_since {
1555                continue;
1556            }
1557            let diff = -diff;
1558            match builder.add(&data, &(), &old_upper_ts, &diff).await? {
1559                Added::Record => {}
1560                Added::RecordAndParts => {
1561                    debug!(?id, "added part to builder");
1562                }
1563            }
1564        }
1565    }
1566
1567    let mut updates = builder
1568        .finish(Antichain::from_elem(new_upper_ts.clone()))
1569        .await?;
1570    let mut batches = vec![&mut updates];
1571
1572    write_handle
1573        .compare_and_append_batch(
1574            batches.as_mut_slice(),
1575            Antichain::from_elem(old_upper_ts),
1576            Antichain::from_elem(new_upper_ts),
1577            true,
1578        )
1579        .await
1580        .expect("valid usage")
1581        .map_err(|e| anyhow!("appending retractions: {e:?}"))
1582}
1583
1584/// Effectively truncates the status history shard based on its retention policy.
1585///
1586/// NOTE: The history collections are really append-only collections, but
1587/// every-now-and-then we want to retract old updates so that the collection
1588/// does not grow unboundedly. Crucially, these are _not_ incremental
1589/// collections, they are not derived from a state at some time `t` and we
1590/// cannot maintain a desired state for them.
1591///
1592/// Returns a map with latest unpacked row per key.
1593pub(crate) async fn partially_truncate_status_history<T, K>(
1594    id: GlobalId,
1595    introspection_type: IntrospectionType,
1596    write_handle: &mut WriteHandle<SourceData, (), T, StorageDiff>,
1597    status_history_desc: StatusHistoryDesc<K>,
1598    now: NowFn,
1599    storage_collections: &Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
1600) -> BTreeMap<K, Row>
1601where
1602    T: Codec64 + From<EpochMillis> + TimestampManipulation,
1603    K: Clone + Debug + Ord + Send + Sync,
1604{
1605    let upper = write_handle.fetch_recent_upper().await.clone();
1606
1607    let mut rows = match upper.as_option() {
1608        Some(f) if f > &T::minimum() => {
1609            let as_of = f.step_back().unwrap();
1610
1611            storage_collections
1612                .snapshot(id, as_of)
1613                .await
1614                .expect("snapshot succeeds")
1615        }
1616        // If collection is closed or the frontier is the minimum, we cannot
1617        // or don't need to truncate (respectively).
1618        _ => return BTreeMap::new(),
1619    };
1620
1621    // BTreeMap to keep track of the row with the latest timestamp for each key.
1622    let mut latest_row_per_key: BTreeMap<K, (CheckedTimestamp<DateTime<Utc>>, Row)> =
1623        BTreeMap::new();
1624
1625    // Consolidate the snapshot, so we can process it correctly below.
1626    differential_dataflow::consolidation::consolidate(&mut rows);
1627
1628    let mut deletions = vec![];
1629
1630    let mut handle_row = {
1631        let latest_row_per_key = &mut latest_row_per_key;
1632        move |row: &Row, diff| {
1633            let datums = row.unpack();
1634            let key = (status_history_desc.extract_key)(&datums);
1635            let timestamp = (status_history_desc.extract_time)(&datums);
1636
1637            assert!(
1638                diff > 0,
1639                "only know how to operate over consolidated data with diffs > 0, \
1640                    found diff {diff} for object {key:?} in {introspection_type:?}",
1641            );
1642
1643            // Keep track of the timestamp of the latest row per key.
1644            match latest_row_per_key.get(&key) {
1645                Some(existing) if &existing.0 > &timestamp => {}
1646                _ => {
1647                    latest_row_per_key.insert(key.clone(), (timestamp, row.clone()));
1648                }
1649            };
1650            (key, timestamp)
1651        }
1652    };
1653
1654    match status_history_desc.retention_policy {
1655        StatusHistoryRetentionPolicy::LastN(n) => {
1656            // BTreeMap to track the earliest events for each key.
1657            let mut last_n_entries_per_key: BTreeMap<
1658                K,
1659                BinaryHeap<Reverse<(CheckedTimestamp<DateTime<Utc>>, Row)>>,
1660            > = BTreeMap::new();
1661
1662            for (row, diff) in rows {
1663                let (key, timestamp) = handle_row(&row, diff);
1664
1665                // Duplicate rows ARE possible if many status changes happen in VERY quick succession,
1666                // so we handle duplicated rows separately.
1667                let entries = last_n_entries_per_key.entry(key).or_default();
1668                for _ in 0..diff {
1669                    // We CAN have multiple statuses (most likely Starting and Running) at the exact same
1670                    // millisecond, depending on how the `health_operator` is scheduled.
1671                    //
1672                    // Note that these will be arbitrarily ordered, so a Starting event might
1673                    // survive and a Running one won't. The next restart will remove the other,
1674                    // so we don't bother being careful about it.
1675                    //
1676                    // TODO(guswynn): unpack these into health-status objects and use
1677                    // their `Ord` impl.
1678                    entries.push(Reverse((timestamp, row.clone())));
1679
1680                    // Retain some number of entries, using pop to mark the oldest entries for
1681                    // deletion.
1682                    while entries.len() > n {
1683                        if let Some(Reverse((_, r))) = entries.pop() {
1684                            deletions.push(r);
1685                        }
1686                    }
1687                }
1688            }
1689        }
1690        StatusHistoryRetentionPolicy::TimeWindow(time_window) => {
1691            // Get the lower bound of our retention window
1692            let now = mz_ore::now::to_datetime(now());
1693            let keep_since = now - time_window;
1694
1695            // Mark any row outside the retention window for deletion
1696            for (row, diff) in rows {
1697                let (_, timestamp) = handle_row(&row, diff);
1698
1699                if *timestamp < keep_since {
1700                    deletions.push(row);
1701                }
1702            }
1703        }
1704    }
1705
1706    // It is very important that we append our retractions at the timestamp
1707    // right after the timestamp at which we got our snapshot. Otherwise,
1708    // it's possible for someone else to sneak in retractions or other
1709    // unexpected changes.
1710    let expected_upper = upper.into_option().expect("checked above");
1711    let new_upper = TimestampManipulation::step_forward(&expected_upper);
1712
1713    // Updates are only deletes because everything else is already in the shard.
1714    let updates = deletions
1715        .into_iter()
1716        .map(|row| ((SourceData(Ok(row)), ()), expected_upper.clone(), -1))
1717        .collect::<Vec<_>>();
1718
1719    let res = write_handle
1720        .compare_and_append(
1721            updates,
1722            Antichain::from_elem(expected_upper.clone()),
1723            Antichain::from_elem(new_upper),
1724        )
1725        .await
1726        .expect("usage was valid");
1727
1728    match res {
1729        Ok(_) => {
1730            // All good, yay!
1731        }
1732        Err(err) => {
1733            // This is fine, it just means the upper moved because
1734            // of continual upper advancement or because someone
1735            // already appended some more retractions/updates.
1736            //
1737            // NOTE: We might want to attempt these partial
1738            // retractions on an interval, instead of only when
1739            // starting up!
1740            info!(
1741                %id, ?expected_upper, current_upper = ?err.current,
1742                "failed to append partial truncation",
1743            );
1744        }
1745    }
1746
1747    latest_row_per_key
1748        .into_iter()
1749        .map(|(key, (_, row))| (key, row))
1750        .collect()
1751}
1752
1753async fn monotonic_append<T: Timestamp + Lattice + Codec64 + TimestampManipulation>(
1754    write_handle: &mut WriteHandle<SourceData, (), T, StorageDiff>,
1755    updates: Vec<TimestamplessUpdate>,
1756    at_least: T,
1757) {
1758    let mut expected_upper = write_handle.shared_upper();
1759    loop {
1760        if updates.is_empty() && expected_upper.is_empty() {
1761            // Ignore timestamp advancement for
1762            // closed collections. TODO? Make this a
1763            // correctable error
1764            return;
1765        }
1766
1767        let upper = expected_upper
1768            .into_option()
1769            .expect("cannot append data to closed collection");
1770
1771        let lower = if upper.less_than(&at_least) {
1772            at_least.clone()
1773        } else {
1774            upper.clone()
1775        };
1776
1777        let new_upper = TimestampManipulation::step_forward(&lower);
1778        let updates = updates
1779            .iter()
1780            .map(|TimestamplessUpdate { row, diff }| {
1781                (
1782                    (SourceData(Ok(row.clone())), ()),
1783                    lower.clone(),
1784                    diff.into_inner(),
1785                )
1786            })
1787            .collect::<Vec<_>>();
1788        let res = write_handle
1789            .compare_and_append(
1790                updates,
1791                Antichain::from_elem(upper),
1792                Antichain::from_elem(new_upper),
1793            )
1794            .await
1795            .expect("valid usage");
1796        match res {
1797            Ok(()) => return,
1798            Err(err) => {
1799                expected_upper = err.current;
1800                continue;
1801            }
1802        }
1803    }
1804}
1805
1806// Helper method for notifying listeners.
1807fn notify_listeners<T>(
1808    responders: impl IntoIterator<Item = oneshot::Sender<T>>,
1809    result: impl Fn() -> T,
1810) {
1811    for r in responders {
1812        // We don't care if the listener disappeared.
1813        let _ = r.send(result());
1814    }
1815}
1816
1817#[cfg(test)]
1818mod tests {
1819    use std::collections::BTreeSet;
1820
1821    use super::*;
1822    use itertools::Itertools;
1823    use mz_repr::{Datum, Row};
1824    use mz_storage_client::client::StatusUpdate;
1825    use mz_storage_client::healthcheck::{
1826        MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC,
1827    };
1828
1829    #[mz_ore::test]
1830    fn test_row() {
1831        let error_message = "error message";
1832        let hint = "hint message";
1833        let id = GlobalId::User(1);
1834        let status = Status::Dropped;
1835        let row = Row::from(StatusUpdate {
1836            id,
1837            timestamp: chrono::offset::Utc::now(),
1838            status,
1839            error: Some(error_message.to_string()),
1840            hints: BTreeSet::from([hint.to_string()]),
1841            namespaced_errors: Default::default(),
1842            replica_id: None,
1843        });
1844
1845        for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1846            assert!(datum.is_instance_of(column_type));
1847        }
1848
1849        for (datum, column_type) in row
1850            .iter()
1851            .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
1852        {
1853            assert!(datum.is_instance_of(column_type));
1854        }
1855
1856        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1857        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1858        assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
1859
1860        let details = row
1861            .iter()
1862            .nth(4)
1863            .unwrap()
1864            .unwrap_map()
1865            .iter()
1866            .collect::<Vec<_>>();
1867
1868        assert_eq!(details.len(), 1);
1869        let hint_datum = &details[0];
1870
1871        assert_eq!(hint_datum.0, "hints");
1872        assert_eq!(
1873            hint_datum.1.unwrap_list().iter().next().unwrap(),
1874            Datum::String(hint)
1875        );
1876    }
1877
1878    #[mz_ore::test]
1879    fn test_row_without_hint() {
1880        let error_message = "error message";
1881        let id = GlobalId::User(1);
1882        let status = Status::Dropped;
1883        let row = Row::from(StatusUpdate {
1884            id,
1885            timestamp: chrono::offset::Utc::now(),
1886            status,
1887            error: Some(error_message.to_string()),
1888            hints: Default::default(),
1889            namespaced_errors: Default::default(),
1890            replica_id: None,
1891        });
1892
1893        for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1894            assert!(datum.is_instance_of(column_type));
1895        }
1896
1897        for (datum, column_type) in row
1898            .iter()
1899            .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
1900        {
1901            assert!(datum.is_instance_of(column_type));
1902        }
1903
1904        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1905        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1906        assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
1907        assert_eq!(row.iter().nth(4).unwrap(), Datum::Null);
1908    }
1909
1910    #[mz_ore::test]
1911    fn test_row_without_error() {
1912        let id = GlobalId::User(1);
1913        let status = Status::Dropped;
1914        let hint = "hint message";
1915        let row = Row::from(StatusUpdate {
1916            id,
1917            timestamp: chrono::offset::Utc::now(),
1918            status,
1919            error: None,
1920            hints: BTreeSet::from([hint.to_string()]),
1921            namespaced_errors: Default::default(),
1922            replica_id: None,
1923        });
1924
1925        for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1926            assert!(datum.is_instance_of(column_type));
1927        }
1928
1929        for (datum, column_type) in row
1930            .iter()
1931            .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
1932        {
1933            assert!(datum.is_instance_of(column_type));
1934        }
1935
1936        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1937        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1938        assert_eq!(row.iter().nth(3).unwrap(), Datum::Null);
1939
1940        let details = row
1941            .iter()
1942            .nth(4)
1943            .unwrap()
1944            .unwrap_map()
1945            .iter()
1946            .collect::<Vec<_>>();
1947
1948        assert_eq!(details.len(), 1);
1949        let hint_datum = &details[0];
1950
1951        assert_eq!(hint_datum.0, "hints");
1952        assert_eq!(
1953            hint_datum.1.unwrap_list().iter().next().unwrap(),
1954            Datum::String(hint)
1955        );
1956    }
1957
1958    #[mz_ore::test]
1959    fn test_row_with_namespaced() {
1960        let error_message = "error message";
1961        let id = GlobalId::User(1);
1962        let status = Status::Dropped;
1963        let row = Row::from(StatusUpdate {
1964            id,
1965            timestamp: chrono::offset::Utc::now(),
1966            status,
1967            error: Some(error_message.to_string()),
1968            hints: Default::default(),
1969            namespaced_errors: BTreeMap::from([("thing".to_string(), "error".to_string())]),
1970            replica_id: None,
1971        });
1972
1973        for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1974            assert!(datum.is_instance_of(column_type));
1975        }
1976
1977        for (datum, column_type) in row
1978            .iter()
1979            .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
1980        {
1981            assert!(datum.is_instance_of(column_type));
1982        }
1983
1984        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1985        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1986        assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
1987
1988        let details = row
1989            .iter()
1990            .nth(4)
1991            .unwrap()
1992            .unwrap_map()
1993            .iter()
1994            .collect::<Vec<_>>();
1995
1996        assert_eq!(details.len(), 1);
1997        let ns_datum = &details[0];
1998
1999        assert_eq!(ns_datum.0, "namespaced");
2000        assert_eq!(
2001            ns_datum.1.unwrap_map().iter().next().unwrap(),
2002            ("thing", Datum::String("error"))
2003        );
2004    }
2005
2006    #[mz_ore::test]
2007    fn test_row_with_everything() {
2008        let error_message = "error message";
2009        let hint = "hint message";
2010        let id = GlobalId::User(1);
2011        let status = Status::Dropped;
2012        let row = Row::from(StatusUpdate {
2013            id,
2014            timestamp: chrono::offset::Utc::now(),
2015            status,
2016            error: Some(error_message.to_string()),
2017            hints: BTreeSet::from([hint.to_string()]),
2018            namespaced_errors: BTreeMap::from([("thing".to_string(), "error".to_string())]),
2019            replica_id: None,
2020        });
2021
2022        for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
2023            assert!(datum.is_instance_of(column_type));
2024        }
2025
2026        for (datum, column_type) in row
2027            .iter()
2028            .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
2029        {
2030            assert!(datum.is_instance_of(column_type));
2031        }
2032
2033        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
2034        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
2035        assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
2036
2037        let details = row
2038            .iter()
2039            .nth(4)
2040            .unwrap()
2041            .unwrap_map()
2042            .iter()
2043            .collect::<Vec<_>>();
2044
2045        assert_eq!(details.len(), 2);
2046        // These are always sorted
2047        let hint_datum = &details[0];
2048        let ns_datum = &details[1];
2049
2050        assert_eq!(hint_datum.0, "hints");
2051        assert_eq!(
2052            hint_datum.1.unwrap_list().iter().next().unwrap(),
2053            Datum::String(hint)
2054        );
2055
2056        assert_eq!(ns_datum.0, "namespaced");
2057        assert_eq!(
2058            ns_datum.1.unwrap_map().iter().next().unwrap(),
2059            ("thing", Datum::String("error"))
2060        );
2061    }
2062}