Skip to main content

mz_storage_controller/
collection_mgmt.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Tokio tasks (and support machinery) for maintaining storage-managed
11//! collections.
12//!
13//! We differentiate between append-only collections and differential
14//! collections. The intent is that knowing the type allows being more
15//! intentional about what state we keep in memory and how we work when in
16//! read-only mode / during zero-downtime upgrades.
17//!
18//! ## Append-only collections
19//!
20//! Writers only append blind writes. Those writes never fail. It does not
21//! matter at what timestamp they happen (to a degree, but ...).
22//!
23//! While in read-only mode, the append-only write task can immediately write
24//! updates out as batches, but only append them when going out of read-only
25//! mode.
26//!
27//! ## Differential collections
28//!
29//! These are very similar to the self-correcting persist_sink. We have an
30//! in-memory desired state and continually make it so that persist matches
31//! desired. As described below (in the task implementation), we could do this
32//! in a memory efficient way by keeping open a persist read handle and
33//! continually updating/consolidating our desired collection. This way, we
34//! would be memory-efficient even in read-only mode.
35//!
36//! This is an evolution of the current design where, on startup, we bring the
37//! persist collection into a known state (mostly by retracting everything) and
38//! then assume that this `envd` is the only writer. We panic when that is ever
39//! not the case, which we notice when the upper of a collection changes
40//! unexpectedly. With this new design we can instead continually update our
41//! view of the persist shard and emit updates when needed, when desired
42//! changed.
43//!
44//! NOTE: As it is, we always keep all of desired in memory. Only when told to
45//! go out of read-only mode would we start attempting to write.
46//!
47//! ## Read-only mode
48//!
49//! When [`CollectionManager`] is in read-only mode it cannot write out to
50//! persist. It will, however, maintain the `desired` state of differential
51//! collections so that we can immediately start writing out updates when going
52//! out of read-only mode.
53//!
54//! For append-only collections we either panic, in the case of
55//! [`CollectionManager::blind_write`], or report back a
56//! [`StorageError::ReadOnly`] when trying to append through a
57//! [`MonotonicAppender`] returned from
58//! [`CollectionManager::monotonic_appender`].
59
60use std::any::Any;
61use std::cmp::Reverse;
62use std::collections::{BTreeMap, BinaryHeap};
63use std::fmt::Debug;
64use std::ops::ControlFlow;
65use std::pin::Pin;
66use std::str::FromStr;
67use std::sync::atomic::{AtomicU64, Ordering};
68use std::sync::{Arc, Mutex};
69
70use anyhow::{anyhow, bail};
71use chrono::{DateTime, Utc};
72use differential_dataflow::consolidation;
73use futures::future::BoxFuture;
74use futures::stream::StreamExt;
75use futures::{Future, FutureExt};
76use mz_cluster_client::ReplicaId;
77use mz_dyncfg::ConfigSet;
78use mz_ore::now::NowFn;
79use mz_ore::retry::Retry;
80use mz_ore::soft_panic_or_log;
81use mz_ore::task::AbortOnDropHandle;
82use mz_persist_client::batch::Added;
83use mz_persist_client::read::ReadHandle;
84use mz_persist_client::write::WriteHandle;
85use mz_repr::adt::timestamp::CheckedTimestamp;
86use mz_repr::{ColumnName, DatumVec, Diff, GlobalId, Row, Timestamp};
87use mz_storage_client::client::{AppendOnlyUpdate, Status, TimestamplessUpdate};
88use mz_storage_client::controller::{IntrospectionType, MonotonicAppender, StorageWriteOp};
89use mz_storage_client::healthcheck::{
90    MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_METRICS_HISTORY_DESC,
91    WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC, WALLCLOCK_LAG_HISTORY_DESC,
92};
93use mz_storage_client::metrics::StorageControllerMetrics;
94use mz_storage_client::statistics::ControllerSinkStatistics;
95use mz_storage_client::storage_collections::StorageCollections;
96use mz_storage_types::StorageDiff;
97use mz_storage_types::controller::InvalidUpper;
98use mz_storage_types::dyncfgs::{
99    REPLICA_METRICS_HISTORY_RETENTION_INTERVAL, WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL,
100    WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL,
101};
102use mz_storage_types::parameters::{
103    STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT, StorageParameters,
104};
105use mz_storage_types::sources::SourceData;
106use timely::progress::Antichain;
107use tokio::sync::{mpsc, oneshot, watch};
108use tokio::time::{Duration, Instant};
109use tracing::{debug, error, info};
110
111use crate::{
112    StatusHistoryDesc, StatusHistoryRetentionPolicy, StorageError, collection_mgmt,
113    privatelink_status_history_desc, replica_status_history_desc, sink_status_history_desc,
114    snapshot_statistics, source_status_history_desc, statistics,
115};
116
117// Default rate at which we advance the uppers of managed collections.
118const DEFAULT_TICK_MS: u64 = 1_000;
119
120/// A channel for sending writes to a differential collection.
121type DifferentialWriteChannel =
122    mpsc::UnboundedSender<(StorageWriteOp, oneshot::Sender<Result<(), StorageError>>)>;
123
124/// A channel for sending writes to an append-only collection.
125type AppendOnlyWriteChannel = mpsc::UnboundedSender<(
126    Vec<AppendOnlyUpdate>,
127    oneshot::Sender<Result<(), StorageError>>,
128)>;
129
130type WriteTask = AbortOnDropHandle<()>;
131type ShutdownSender = oneshot::Sender<()>;
132
133/// Types of storage-managed/introspection collections:
134///
135/// Append-only: Only accepts blind writes, writes that can be applied at any
136/// timestamp and don’t depend on current collection contents.
137///
138/// Pseudo append-only: We treat them largely as append-only collections but
139/// periodically (currently on bootstrap) retract old updates from them.
140///
141/// Differential: at any given time `t` , collection contents mirrors some
142/// (small cardinality) state. The cardinality of the collection stays constant
143/// if the thing that is mirrored doesn’t change in cardinality. At steady
144/// state, updates always come in pairs of retractions/additions.
145pub enum CollectionManagerKind {
146    AppendOnly,
147    Differential,
148}
149
150#[derive(Debug, Clone)]
151pub struct CollectionManager {
152    /// When a [`CollectionManager`] is in read-only mode it must not affect any
153    /// changes to external state.
154    read_only: bool,
155
156    // WIP: Name TBD! I thought about `managed_collections`, `ivm_collections`,
157    // `self_correcting_collections`.
158    /// These are collections that we write to by adding/removing updates to an
159    /// internal _desired_ collection. The `CollectionManager` continually makes
160    /// sure that collection contents (in persist) match the desired state.
161    differential_collections:
162        Arc<Mutex<BTreeMap<GlobalId, (DifferentialWriteChannel, WriteTask, ShutdownSender)>>>,
163
164    /// Collections that we only append to using blind-writes.
165    ///
166    /// Every write succeeds at _some_ timestamp, and we never check what the
167    /// actual contents of the collection (in persist) are.
168    append_only_collections:
169        Arc<Mutex<BTreeMap<GlobalId, (AppendOnlyWriteChannel, WriteTask, ShutdownSender)>>>,
170
171    /// Amount of time we'll wait before sending a batch of inserts to Persist, for user
172    /// collections.
173    user_batch_duration_ms: Arc<AtomicU64>,
174    now: NowFn,
175}
176
177/// The `CollectionManager` provides two complementary functions:
178/// - Providing an API to append values to a registered set of collections.
179///   For this usecase:
180///     - The `CollectionManager` expects to be the only writer.
181///     - Appending to a closed collection panics
182/// - Automatically advancing the timestamp of managed collections every
183///   second. For this usecase:
184///     - The `CollectionManager` handles contention by permitting and ignoring errors.
185///     - Closed collections will not panic if they continue receiving these requests.
186impl CollectionManager {
187    pub(super) fn new(read_only: bool, now: NowFn) -> CollectionManager {
188        let batch_duration_ms: u64 = STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT
189            .as_millis()
190            .try_into()
191            .expect("known to fit");
192
193        CollectionManager {
194            read_only,
195            differential_collections: Arc::new(Mutex::new(BTreeMap::new())),
196            append_only_collections: Arc::new(Mutex::new(BTreeMap::new())),
197            user_batch_duration_ms: Arc::new(AtomicU64::new(batch_duration_ms)),
198            now,
199        }
200    }
201
202    /// Updates the duration we'll wait to batch events for user owned collections.
203    pub fn update_user_batch_duration(&self, duration: Duration) {
204        tracing::info!(?duration, "updating user batch duration");
205        let millis: u64 = duration.as_millis().try_into().unwrap_or(u64::MAX);
206        self.user_batch_duration_ms.store(millis, Ordering::Relaxed);
207    }
208
209    /// Registers a new _differential collection_.
210    ///
211    /// The [CollectionManager] will automatically advance the upper of every
212    /// registered collection every second.
213    ///
214    /// Update the `desired` state of a differential collection using
215    /// [Self::differential_write].
216    pub(super) fn register_differential_collection<R>(
217        &self,
218        id: GlobalId,
219        write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
220        read_handle_fn: R,
221        force_writable: bool,
222        introspection_config: DifferentialIntrospectionConfig,
223    ) where
224        R: FnMut() -> Pin<
225                Box<dyn Future<Output = ReadHandle<SourceData, (), Timestamp, StorageDiff>> + Send>,
226            > + Send
227            + Sync
228            + 'static,
229    {
230        let mut guard = self
231            .differential_collections
232            .lock()
233            .expect("collection_mgmt panicked");
234
235        // Check if this collection is already registered.
236        if let Some((_writer, task, _shutdown_tx)) = guard.get(&id) {
237            // The collection is already registered and the task is still running so nothing to do.
238            if !task.is_finished() {
239                // TODO(parkmycar): Panic here if we never see this error in production.
240                tracing::error!("Registered a collection twice! {id:?}");
241                return;
242            }
243        }
244
245        let read_only = self.get_read_only(id, force_writable);
246
247        // Spawns a new task so we can write to this collection.
248        let writer_and_handle = DifferentialWriteTask::spawn(
249            id,
250            write_handle,
251            read_handle_fn,
252            read_only,
253            self.now.clone(),
254            introspection_config,
255        );
256        let prev = guard.insert(id, writer_and_handle);
257
258        // Double check the previous task was actually finished.
259        if let Some((_, prev_task, _)) = prev {
260            assert!(
261                prev_task.is_finished(),
262                "should only spawn a new task if the previous is finished"
263            );
264        }
265    }
266
267    /// Registers a new _append-only collection_.
268    ///
269    /// The [CollectionManager] will automatically advance the upper of every
270    /// registered collection every second.
271    pub(super) fn register_append_only_collection(
272        &self,
273        id: GlobalId,
274        write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
275        force_writable: bool,
276        introspection_config: Option<AppendOnlyIntrospectionConfig>,
277    ) {
278        let mut guard = self
279            .append_only_collections
280            .lock()
281            .expect("collection_mgmt panicked");
282
283        // Check if this collection is already registered.
284        if let Some((_writer, task, _shutdown_tx)) = guard.get(&id) {
285            // The collection is already registered and the task is still running so nothing to do.
286            if !task.is_finished() {
287                // TODO(parkmycar): Panic here if we never see this error in production.
288                tracing::error!("Registered a collection twice! {id:?}");
289                return;
290            }
291        }
292
293        let read_only = self.get_read_only(id, force_writable);
294
295        // Spawns a new task so we can write to this collection.
296        let writer_and_handle = AppendOnlyWriteTask::spawn(
297            id,
298            write_handle,
299            read_only,
300            self.now.clone(),
301            Arc::clone(&self.user_batch_duration_ms),
302            introspection_config,
303        );
304        let prev = guard.insert(id, writer_and_handle);
305
306        // Double check the previous task was actually finished.
307        if let Some((_, prev_task, _)) = prev {
308            assert!(
309                prev_task.is_finished(),
310                "should only spawn a new task if the previous is finished"
311            );
312        }
313    }
314
315    /// Unregisters the given collection.
316    ///
317    /// Also waits until the `CollectionManager` has completed all outstanding work to ensure that
318    /// it has stopped referencing the provided `id`.
319    #[mz_ore::instrument(level = "debug")]
320    pub(super) fn unregister_collection(&self, id: GlobalId) -> BoxFuture<'static, ()> {
321        let prev = self
322            .differential_collections
323            .lock()
324            .expect("CollectionManager panicked")
325            .remove(&id);
326
327        // Wait for the task to complete before reporting as unregisted.
328        if let Some((_prev_writer, prev_task, shutdown_tx)) = prev {
329            // Notify the task it needs to shutdown.
330            //
331            // We can ignore errors here because they indicate the task is already done.
332            let _ = shutdown_tx.send(());
333            return Box::pin(prev_task.map(|_| ()));
334        }
335
336        let prev = self
337            .append_only_collections
338            .lock()
339            .expect("CollectionManager panicked")
340            .remove(&id);
341
342        // Wait for the task to complete before reporting as unregisted.
343        if let Some((_prev_writer, prev_task, shutdown_tx)) = prev {
344            // Notify the task it needs to shutdown.
345            //
346            // We can ignore errors here because they indicate the task is already done.
347            let _ = shutdown_tx.send(());
348            return Box::pin(prev_task.map(|_| ()));
349        }
350
351        Box::pin(futures::future::ready(()))
352    }
353
354    /// Returns a sender for writes to the given append-only collection.
355    ///
356    /// # Panics
357    /// - If `id` does not belong to an append-only collections.
358    pub(super) fn append_only_write_sender(&self, id: GlobalId) -> AppendOnlyWriteChannel {
359        let collections = self.append_only_collections.lock().expect("poisoned");
360        match collections.get(&id) {
361            Some((tx, _, _)) => tx.clone(),
362            None => panic!("missing append-only collection: {id}"),
363        }
364    }
365
366    /// Returns a sender for writes to the given differential collection.
367    ///
368    /// # Panics
369    /// - If `id` does not belong to a differential collections.
370    pub(super) fn differential_write_sender(&self, id: GlobalId) -> DifferentialWriteChannel {
371        let collections = self.differential_collections.lock().expect("poisoned");
372        match collections.get(&id) {
373            Some((tx, _, _)) => tx.clone(),
374            None => panic!("missing differential collection: {id}"),
375        }
376    }
377
378    /// Appends `updates` to the append-only collection identified by `id`, at
379    /// _some_ timestamp. Does not wait for the append to complete.
380    ///
381    /// # Panics
382    /// - If `id` does not belong to an append-only collections.
383    /// - If this [`CollectionManager`] is in read-only mode.
384    /// - If the collection closed.
385    pub(super) fn blind_write(&self, id: GlobalId, updates: Vec<AppendOnlyUpdate>) {
386        if self.read_only {
387            panic!("attempting blind write to {} while in read-only mode", id);
388        }
389
390        if updates.is_empty() {
391            return;
392        }
393
394        let collections = self.append_only_collections.lock().expect("poisoned");
395        match collections.get(&id) {
396            Some((update_tx, _, _)) => {
397                let (tx, _rx) = oneshot::channel();
398                update_tx.send((updates, tx)).expect("rx hung up");
399            }
400            None => panic!("missing append-only collection: {id}"),
401        }
402    }
403
404    /// Updates the desired collection state of the differential collection identified by
405    /// `id`. The underlying persist shard will reflect this change at
406    /// _some_point. Does not wait for the change to complete.
407    ///
408    /// # Panics
409    /// - If `id` does not belong to a differential collection.
410    /// - If the collection closed.
411    pub(super) fn differential_write(&self, id: GlobalId, op: StorageWriteOp) {
412        if op.is_empty_append() {
413            return;
414        }
415
416        let collections = self.differential_collections.lock().expect("poisoned");
417        match collections.get(&id) {
418            Some((update_tx, _, _)) => {
419                let (tx, _rx) = oneshot::channel();
420                update_tx.send((op, tx)).expect("rx hung up");
421            }
422            None => panic!("missing differential collection: {id}"),
423        }
424    }
425
426    /// Appends the given `updates` to the differential collection identified by `id`.
427    ///
428    /// # Panics
429    /// - If `id` does not belong to a differential collection.
430    /// - If the collection closed.
431    pub(super) fn differential_append(&self, id: GlobalId, updates: Vec<(Row, Diff)>) {
432        self.differential_write(id, StorageWriteOp::Append { updates })
433    }
434
435    /// Returns a [`MonotonicAppender`] that can be used to monotonically append updates to the
436    /// collection correlated with `id`.
437    pub(super) fn monotonic_appender(
438        &self,
439        id: GlobalId,
440    ) -> Result<MonotonicAppender, StorageError> {
441        let guard = self
442            .append_only_collections
443            .lock()
444            .expect("CollectionManager panicked");
445        let tx = guard
446            .get(&id)
447            .map(|(tx, _, _)| tx.clone())
448            .ok_or(StorageError::IdentifierMissing(id))?;
449
450        Ok(MonotonicAppender::new(tx))
451    }
452
453    fn get_read_only(&self, id: GlobalId, force_writable: bool) -> bool {
454        if force_writable {
455            assert!(id.is_system(), "unexpected non-system global id: {id:?}");
456            false
457        } else {
458            self.read_only
459        }
460    }
461}
462
463pub(crate) struct DifferentialIntrospectionConfig {
464    pub(crate) recent_upper: Antichain<Timestamp>,
465    pub(crate) introspection_type: IntrospectionType,
466    pub(crate) storage_collections: Arc<dyn StorageCollections + Send + Sync>,
467    pub(crate) collection_manager: collection_mgmt::CollectionManager,
468    pub(crate) source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
469    pub(crate) sink_statistics:
470        Arc<Mutex<BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSinkStatistics>>>,
471    pub(crate) statistics_interval: Duration,
472    pub(crate) statistics_interval_receiver: watch::Receiver<Duration>,
473    pub(crate) statistics_retention_duration: Duration,
474    pub(crate) metrics: StorageControllerMetrics,
475    pub(crate) introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
476}
477
478/// A task that will make it so that the state in persist matches the desired
479/// state and continuously bump the upper for the specified collection.
480///
481/// NOTE: This implementation is a bit clunky, and could be optimized by not keeping
482/// all of desired in memory (see commend below). It is meant to showcase the
483/// general approach.
484struct DifferentialWriteTask<R>
485where
486    R: FnMut() -> Pin<
487            Box<dyn Future<Output = ReadHandle<SourceData, (), Timestamp, StorageDiff>> + Send>,
488        > + Send
489        + 'static,
490{
491    /// The collection that we are writing to.
492    id: GlobalId,
493
494    write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
495
496    /// For getting a [`ReadHandle`] to sync our state to persist contents.
497    read_handle_fn: R,
498
499    read_only: bool,
500
501    now: NowFn,
502
503    /// In the absence of updates, we regularly bump the upper to "now", on this
504    /// interval. This makes it so the collection remains readable at recent
505    /// timestamps.
506    upper_tick_interval: tokio::time::Interval,
507
508    /// Receiver for write commands. These change our desired state.
509    cmd_rx: mpsc::UnboundedReceiver<(StorageWriteOp, oneshot::Sender<Result<(), StorageError>>)>,
510
511    /// We have to shut down when receiving from this.
512    shutdown_rx: oneshot::Receiver<()>,
513
514    /// The contents of the collection as it should be according to whoever is
515    /// driving us around.
516    // This is memory inefficient: we always keep a full copy of
517    // desired, so that we can re-derive a to_write if/when someone else
518    // writes to persist and we notice because of an upper conflict.
519    // This is optimized for the case where we rarely have more than one
520    // writer.
521    //
522    // We can optimize for a multi-writer case by keeping an open
523    // ReadHandle and continually reading updates from persist, updating
524    // a desired in place. Similar to the self-correcting persist_sink.
525    desired: Vec<(Row, Diff)>,
526
527    /// Updates that we have to write when next writing to persist. This is
528    /// determined by looking at what is desired and what is in persist.
529    to_write: Vec<(Row, Diff)>,
530
531    /// Current upper of the persist shard. We keep track of this so that we
532    /// realize when someone else writes to the shard, in which case we have to
533    /// update our state of the world, that is update our `to_write` based on
534    /// `desired` and the contents of the persist shard.
535    current_upper: Timestamp,
536}
537
538impl<R> DifferentialWriteTask<R>
539where
540    R: FnMut() -> Pin<
541            Box<dyn Future<Output = ReadHandle<SourceData, (), Timestamp, StorageDiff>> + Send>,
542        > + Send
543        + Sync
544        + 'static,
545{
546    /// Spawns a [`DifferentialWriteTask`] in an [`mz_ore::task`] and returns
547    /// handles for interacting with it.
548    fn spawn(
549        id: GlobalId,
550        write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
551        read_handle_fn: R,
552        read_only: bool,
553        now: NowFn,
554        introspection_config: DifferentialIntrospectionConfig,
555    ) -> (DifferentialWriteChannel, WriteTask, ShutdownSender) {
556        let (tx, rx) = mpsc::unbounded_channel();
557        let (shutdown_tx, shutdown_rx) = oneshot::channel();
558
559        let upper_tick_interval = tokio::time::interval(Duration::from_millis(DEFAULT_TICK_MS));
560
561        let task = Self {
562            id,
563            write_handle,
564            read_handle_fn,
565            read_only,
566            now,
567            upper_tick_interval,
568            cmd_rx: rx,
569            shutdown_rx,
570            desired: Vec::new(),
571            to_write: Vec::new(),
572            current_upper: Timestamp::MIN,
573        };
574
575        let handle = mz_ore::task::spawn(
576            || format!("CollectionManager-differential_write_task-{id}"),
577            async move {
578                if !task.read_only {
579                    task.prepare(introspection_config).await;
580                }
581                let res = task.run().await;
582
583                match res {
584                    ControlFlow::Break(reason) => {
585                        info!("write_task-{} ending: {}", id, reason);
586                    }
587                    c @ ControlFlow::Continue(_) => {
588                        unreachable!(
589                            "cannot break out of the loop with a Continue, but got: {:?}",
590                            c
591                        );
592                    }
593                }
594            },
595        );
596
597        (tx, handle.abort_on_drop(), shutdown_tx)
598    }
599
600    /// Does any work that is required before this background task starts
601    /// writing to the given introspection collection.
602    ///
603    /// This might include consolidation, deleting older entries or seeding
604    /// in-memory state of, say, scrapers, with current collection contents.
605    async fn prepare(&self, introspection_config: DifferentialIntrospectionConfig) {
606        tracing::info!(%self.id, ?introspection_config.introspection_type, "preparing differential introspection collection for writes");
607
608        match introspection_config.introspection_type {
609            IntrospectionType::ShardMapping => {
610                // Done by the `append_shard_mappings` call.
611            }
612            IntrospectionType::Frontiers | IntrospectionType::ReplicaFrontiers => {
613                // Differential collections start with an empty
614                // desired state. No need to manually reset.
615            }
616            IntrospectionType::StorageSourceStatistics => {
617                let prev = snapshot_statistics(
618                    self.id,
619                    introspection_config.recent_upper,
620                    &introspection_config.storage_collections,
621                )
622                .await;
623
624                let scraper_token = statistics::spawn_statistics_scraper(
625                    self.id.clone(),
626                    // These do a shallow copy.
627                    introspection_config.collection_manager,
628                    Arc::clone(&introspection_config.source_statistics),
629                    prev,
630                    introspection_config.statistics_interval.clone(),
631                    introspection_config.statistics_interval_receiver.clone(),
632                    introspection_config.statistics_retention_duration,
633                    introspection_config.metrics,
634                );
635                let web_token = statistics::spawn_webhook_statistics_scraper(
636                    introspection_config.source_statistics,
637                    introspection_config.statistics_interval,
638                    introspection_config.statistics_interval_receiver,
639                );
640
641                // Make sure these are dropped when the controller is
642                // dropped, so that the internal task will stop.
643                introspection_config
644                    .introspection_tokens
645                    .lock()
646                    .expect("poisoned")
647                    .insert(self.id, Box::new((scraper_token, web_token)));
648            }
649            IntrospectionType::StorageSinkStatistics => {
650                let prev = snapshot_statistics(
651                    self.id,
652                    introspection_config.recent_upper,
653                    &introspection_config.storage_collections,
654                )
655                .await;
656
657                let scraper_token = statistics::spawn_statistics_scraper(
658                    self.id.clone(),
659                    introspection_config.collection_manager,
660                    Arc::clone(&introspection_config.sink_statistics),
661                    prev,
662                    introspection_config.statistics_interval,
663                    introspection_config.statistics_interval_receiver,
664                    introspection_config.statistics_retention_duration,
665                    introspection_config.metrics,
666                );
667
668                // Make sure this is dropped when the controller is
669                // dropped, so that the internal task will stop.
670                introspection_config
671                    .introspection_tokens
672                    .lock()
673                    .expect("poisoned")
674                    .insert(self.id, scraper_token);
675            }
676
677            IntrospectionType::ComputeDependencies
678            | IntrospectionType::ComputeOperatorHydrationStatus
679            | IntrospectionType::ComputeMaterializedViewRefreshes
680            | IntrospectionType::ComputeErrorCounts
681            | IntrospectionType::ComputeHydrationTimes
682            | IntrospectionType::ComputeObjectArrangementSizes => {
683                // Differential collections start with an empty
684                // desired state. No need to manually reset.
685            }
686
687            introspection_type @ IntrospectionType::ReplicaMetricsHistory
688            | introspection_type @ IntrospectionType::WallclockLagHistory
689            | introspection_type @ IntrospectionType::WallclockLagHistogram
690            | introspection_type @ IntrospectionType::PreparedStatementHistory
691            | introspection_type @ IntrospectionType::StatementExecutionHistory
692            | introspection_type @ IntrospectionType::SessionHistory
693            | introspection_type @ IntrospectionType::StatementLifecycleHistory
694            | introspection_type @ IntrospectionType::SqlText
695            | introspection_type @ IntrospectionType::SourceStatusHistory
696            | introspection_type @ IntrospectionType::SinkStatusHistory
697            | introspection_type @ IntrospectionType::PrivatelinkConnectionStatusHistory
698            | introspection_type @ IntrospectionType::ReplicaStatusHistory => {
699                unreachable!("not differential collection: {introspection_type:?}")
700            }
701        }
702    }
703
704    async fn run(mut self) -> ControlFlow<String> {
705        let mut updates = Vec::new();
706        loop {
707            tokio::select! {
708                // Prefer sending actual updates over just bumping the upper,
709                // because sending updates also bump the upper.
710                biased;
711
712                // Listen for a shutdown signal so we can gracefully cleanup.
713                _ = &mut self.shutdown_rx => {
714                    self.handle_shutdown();
715
716                    return ControlFlow::Break("graceful shutdown".to_string());
717                }
718
719                // Pull a chunk of queued updates off the channel.
720                () = recv_all_commands(&mut self.cmd_rx, &mut updates) => {
721                    if updates.is_empty() {
722                        // Sender has been dropped, which means the collection
723                        // should have been unregistered, break out of the run
724                        // loop if we weren't already aborted.
725                        return ControlFlow::Break("sender has been dropped".to_string());
726                    }
727                    self.handle_updates(&mut updates).await?;
728                }
729
730                // If we haven't received any updates, then we'll move the upper forward.
731                _ = self.upper_tick_interval.tick() => {
732                    if self.read_only {
733                        // Not bumping uppers while in read-only mode.
734                        continue;
735                    }
736                    self.tick_upper().await?;
737                },
738            }
739        }
740    }
741
742    async fn tick_upper(&mut self) -> ControlFlow<String> {
743        let now = Timestamp::from((self.now)());
744
745        if now <= self.current_upper {
746            // Upper is already further along than current wall-clock time, no
747            // need to bump it.
748            return ControlFlow::Continue(());
749        }
750
751        assert!(!self.read_only);
752        let res = self
753            .write_handle
754            .compare_and_append_batch(
755                &mut [],
756                Antichain::from_elem(self.current_upper),
757                Antichain::from_elem(now),
758                true,
759            )
760            .await
761            .expect("valid usage");
762        match res {
763            // All good!
764            Ok(()) => {
765                tracing::debug!(%self.id, "bumped upper of differential collection");
766                self.current_upper = now;
767            }
768            Err(err) => {
769                // Someone else wrote to the collection or bumped the upper. We
770                // need to sync to latest persist state and potentially patch up
771                // our `to_write`, based on what we learn and `desired`.
772
773                let actual_upper = if let Some(ts) = err.current.as_option() {
774                    *ts
775                } else {
776                    return ControlFlow::Break("upper is the empty antichain".to_string());
777                };
778
779                tracing::info!(%self.id, ?actual_upper, expected_upper = ?self.current_upper, "upper mismatch while bumping upper, syncing to persist state");
780
781                self.current_upper = actual_upper;
782
783                self.sync_to_persist().await;
784            }
785        }
786
787        ControlFlow::Continue(())
788    }
789
790    fn handle_shutdown(&mut self) {
791        let mut senders = Vec::new();
792
793        // Prevent new messages from being sent.
794        self.cmd_rx.close();
795
796        // Get as many waiting senders as possible.
797        while let Ok((_batch, sender)) = self.cmd_rx.try_recv() {
798            senders.push(sender);
799        }
800
801        // Notify them that this collection is closed.
802        //
803        // Note: if a task is shutting down, that indicates the source has been
804        // dropped, at which point the identifier is invalid. Returning this
805        // error provides a better user experience.
806        notify_listeners(senders, || Err(StorageError::IdentifierInvalid(self.id)));
807    }
808
809    async fn handle_updates(
810        &mut self,
811        batch: &mut Vec<(StorageWriteOp, oneshot::Sender<Result<(), StorageError>>)>,
812    ) -> ControlFlow<String> {
813        // Put in place _some_ rate limiting.
814        let batch_duration_ms = STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT;
815
816        let use_batch_now = Instant::now();
817        let min_time_to_complete = use_batch_now + batch_duration_ms;
818
819        tracing::debug!(
820            ?use_batch_now,
821            ?batch_duration_ms,
822            ?min_time_to_complete,
823            "batch duration",
824        );
825
826        let mut responders = Vec::with_capacity(batch.len());
827        for (op, tx) in batch.drain(..) {
828            self.apply_write_op(op);
829            responders.push(tx);
830        }
831
832        // TODO: Maybe don't do it every time?
833        consolidation::consolidate(&mut self.desired);
834        consolidation::consolidate(&mut self.to_write);
835
836        // Reset the interval which is used to periodically bump the uppers
837        // because the uppers will get bumped with the following update.
838        // This makes it such that we will write at most once every
839        // `interval`.
840        //
841        // For example, let's say our `DEFAULT_TICK` interval is 10, so at
842        // `t + 10`, `t + 20`, ... we'll bump the uppers. If we receive an
843        // update at `t + 3` we want to shift this window so we bump the
844        // uppers at `t + 13`, `t + 23`, ... which resetting the interval
845        // accomplishes.
846        self.upper_tick_interval.reset();
847
848        self.write_to_persist(responders).await?;
849
850        // Wait until our artificial latency has completed.
851        //
852        // Note: if writing to persist took longer than `DEFAULT_TICK` this
853        // await will resolve immediately.
854        tokio::time::sleep_until(min_time_to_complete).await;
855
856        ControlFlow::Continue(())
857    }
858
859    /// Apply the given write operation to the `desired`/`to_write` state.
860    fn apply_write_op(&mut self, op: StorageWriteOp) {
861        match op {
862            StorageWriteOp::Append { updates } => {
863                self.desired.extend_from_slice(&updates);
864                self.to_write.extend(updates);
865            }
866            StorageWriteOp::Delete { filter } => {
867                let to_delete = self.desired.extract_if(.., |(row, _)| filter(row));
868                let retractions = to_delete.map(|(row, diff)| (row, -diff));
869                self.to_write.extend(retractions);
870            }
871        }
872    }
873
874    /// Attempt to write what is currently in [Self::to_write] to persist,
875    /// retrying and re-syncing to persist when necessary, that is when the
876    /// upper was not what we expected.
877    async fn write_to_persist(
878        &mut self,
879        responders: Vec<oneshot::Sender<Result<(), StorageError>>>,
880    ) -> ControlFlow<String> {
881        if self.read_only {
882            tracing::debug!(%self.id, "not writing to differential collection: read-only");
883            // Not attempting to write while in read-only mode.
884            return ControlFlow::Continue(());
885        }
886
887        // We'll try really hard to succeed, but eventually stop.
888        //
889        // Note: it's very rare we should ever need to retry, and if we need to
890        // retry it should only take 1 or 2 attempts. We set `max_tries` to be
891        // high though because if we hit some edge case we want to try hard to
892        // commit the data.
893        let retries = Retry::default()
894            .initial_backoff(Duration::from_secs(1))
895            .clamp_backoff(Duration::from_secs(3))
896            .factor(1.25)
897            .max_tries(20)
898            .into_retry_stream();
899        let mut retries = Box::pin(retries);
900
901        loop {
902            // Append updates to persist!
903            let now = Timestamp::from((self.now)());
904            let new_upper = std::cmp::max(now, self.current_upper.step_forward());
905
906            let updates_to_write = self
907                .to_write
908                .iter()
909                .map(|(row, diff)| {
910                    (
911                        (SourceData(Ok(row.clone())), ()),
912                        self.current_upper,
913                        diff.into_inner(),
914                    )
915                })
916                .collect::<Vec<_>>();
917
918            assert!(!self.read_only);
919            let res = self
920                .write_handle
921                .compare_and_append(
922                    updates_to_write,
923                    Antichain::from_elem(self.current_upper),
924                    Antichain::from_elem(new_upper),
925                )
926                .await
927                .expect("valid usage");
928            match res {
929                // Everything was successful!
930                Ok(()) => {
931                    // Notify all of our listeners.
932                    notify_listeners(responders, || Ok(()));
933
934                    self.current_upper = new_upper;
935
936                    // Very important! This is empty at steady state, while
937                    // desired keeps an in-memory copy of desired state.
938                    self.to_write.clear();
939
940                    tracing::debug!(%self.id, "appended to differential collection");
941
942                    // Break out of the retry loop so we can wait for more data.
943                    break;
944                }
945                // Failed to write to some collections,
946                Err(err) => {
947                    // Someone else wrote to the collection. We need to read
948                    // from persist and update to_write based on that and the
949                    // desired state.
950                    let actual_upper = if let Some(ts) = err.current.as_option() {
951                        *ts
952                    } else {
953                        return ControlFlow::Break("upper is the empty antichain".to_string());
954                    };
955
956                    tracing::info!(%self.id, ?actual_upper, expected_upper = ?self.current_upper, "retrying append for differential collection");
957
958                    // We've exhausted all of our retries, notify listeners and
959                    // break out of the retry loop so we can wait for more data.
960                    if retries.next().await.is_none() {
961                        let invalid_upper = InvalidUpper {
962                            id: self.id,
963                            current_upper: err.current,
964                        };
965                        notify_listeners(responders, || {
966                            Err(StorageError::InvalidUppers(vec![invalid_upper.clone()]))
967                        });
968                        error!(
969                            "exhausted retries when appending to managed collection {}",
970                            self.id
971                        );
972                        break;
973                    }
974
975                    self.current_upper = actual_upper;
976
977                    self.sync_to_persist().await;
978
979                    debug!(
980                        "Retrying invalid-uppers error while appending to differential collection {}",
981                        self.id
982                    );
983                }
984            }
985        }
986
987        ControlFlow::Continue(())
988    }
989
990    /// Re-derives [Self::to_write] by looking at [Self::desired] and the
991    /// current state in persist. We want to insert everything in desired and
992    /// retract everything in persist. But ideally most of that cancels out in
993    /// consolidation.
994    ///
995    /// To be called when a `compare_and_append` failed because the upper didn't
996    /// match what we expected.
997    async fn sync_to_persist(&mut self) {
998        let mut read_handle = (self.read_handle_fn)().await;
999        let as_of = self.current_upper.step_back().unwrap_or(Timestamp::MIN);
1000        let as_of = Antichain::from_elem(as_of);
1001        let snapshot = read_handle.snapshot_and_fetch(as_of).await;
1002
1003        let mut negated_oks = match snapshot {
1004            Ok(contents) => {
1005                let mut snapshot = Vec::with_capacity(contents.len());
1006                for ((data, _), _, diff) in contents {
1007                    let row = data.0.unwrap();
1008                    snapshot.push((row, -Diff::from(diff)));
1009                }
1010                snapshot
1011            }
1012            Err(e) => panic!("read before since: {e:?}"),
1013        };
1014
1015        self.to_write.clear();
1016        self.to_write.extend(self.desired.iter().cloned());
1017        self.to_write.append(&mut negated_oks);
1018        consolidation::consolidate(&mut self.to_write);
1019    }
1020}
1021
1022pub(crate) struct AppendOnlyIntrospectionConfig {
1023    pub(crate) introspection_type: IntrospectionType,
1024    pub(crate) config_set: Arc<ConfigSet>,
1025    pub(crate) parameters: StorageParameters,
1026    pub(crate) storage_collections: Arc<dyn StorageCollections + Send + Sync>,
1027}
1028
1029/// A task that writes to an append only collection and continuously bumps the upper for the specified
1030/// collection.
1031///
1032/// For status history collections, this task can deduplicate redundant [`Statuses`](Status).
1033struct AppendOnlyWriteTask {
1034    /// The collection that we are writing to.
1035    id: GlobalId,
1036    write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1037    read_only: bool,
1038    now: NowFn,
1039    user_batch_duration_ms: Arc<AtomicU64>,
1040    /// Receiver for write commands.
1041    rx: mpsc::UnboundedReceiver<(
1042        Vec<AppendOnlyUpdate>,
1043        oneshot::Sender<Result<(), StorageError>>,
1044    )>,
1045
1046    /// We have to shut down when receiving from this.
1047    shutdown_rx: oneshot::Receiver<()>,
1048    /// If this collection deduplicates statuses, this map is used to track the previous status.
1049    previous_statuses: Option<BTreeMap<(GlobalId, Option<ReplicaId>), Status>>,
1050}
1051
1052impl AppendOnlyWriteTask {
1053    /// Spawns an [`AppendOnlyWriteTask`] in an [`mz_ore::task`] that will continuously bump the
1054    /// upper for the specified collection,
1055    /// and append data that is sent via the provided [`mpsc::UnboundedSender`].
1056    ///
1057    /// TODO(parkmycar): One day if we want to customize the tick interval for each collection, that
1058    /// should be done here.
1059    /// TODO(parkmycar): Maybe add prometheus metrics for each collection?
1060    fn spawn(
1061        id: GlobalId,
1062        write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1063        read_only: bool,
1064        now: NowFn,
1065        user_batch_duration_ms: Arc<AtomicU64>,
1066        introspection_config: Option<AppendOnlyIntrospectionConfig>,
1067    ) -> (AppendOnlyWriteChannel, WriteTask, ShutdownSender) {
1068        let (tx, rx) = mpsc::unbounded_channel();
1069        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1070
1071        let previous_statuses: Option<BTreeMap<(GlobalId, Option<ReplicaId>), Status>> =
1072            match introspection_config
1073                .as_ref()
1074                .map(|config| config.introspection_type)
1075            {
1076                Some(IntrospectionType::SourceStatusHistory)
1077                | Some(IntrospectionType::SinkStatusHistory) => Some(BTreeMap::new()),
1078
1079                Some(IntrospectionType::ReplicaMetricsHistory)
1080                | Some(IntrospectionType::WallclockLagHistory)
1081                | Some(IntrospectionType::WallclockLagHistogram)
1082                | Some(IntrospectionType::PrivatelinkConnectionStatusHistory)
1083                | Some(IntrospectionType::ReplicaStatusHistory)
1084                | Some(IntrospectionType::PreparedStatementHistory)
1085                | Some(IntrospectionType::StatementExecutionHistory)
1086                | Some(IntrospectionType::SessionHistory)
1087                | Some(IntrospectionType::StatementLifecycleHistory)
1088                | Some(IntrospectionType::SqlText)
1089                | None => None,
1090
1091                Some(introspection_type @ IntrospectionType::ShardMapping)
1092                | Some(introspection_type @ IntrospectionType::Frontiers)
1093                | Some(introspection_type @ IntrospectionType::ReplicaFrontiers)
1094                | Some(introspection_type @ IntrospectionType::StorageSourceStatistics)
1095                | Some(introspection_type @ IntrospectionType::StorageSinkStatistics)
1096                | Some(introspection_type @ IntrospectionType::ComputeDependencies)
1097                | Some(introspection_type @ IntrospectionType::ComputeOperatorHydrationStatus)
1098                | Some(introspection_type @ IntrospectionType::ComputeMaterializedViewRefreshes)
1099                | Some(introspection_type @ IntrospectionType::ComputeErrorCounts)
1100                | Some(introspection_type @ IntrospectionType::ComputeHydrationTimes)
1101                | Some(introspection_type @ IntrospectionType::ComputeObjectArrangementSizes) => {
1102                    unreachable!("not append-only collection: {introspection_type:?}")
1103                }
1104            };
1105
1106        let mut task = Self {
1107            id,
1108            write_handle,
1109            rx,
1110            shutdown_rx,
1111            read_only,
1112            now,
1113            user_batch_duration_ms,
1114            previous_statuses,
1115        };
1116
1117        let handle = mz_ore::task::spawn(
1118            || format!("CollectionManager-append_only_write_task-{id}"),
1119            async move {
1120                if !task.read_only {
1121                    task.prepare(introspection_config).await;
1122                }
1123                task.run().await;
1124            },
1125        );
1126
1127        (tx, handle.abort_on_drop(), shutdown_tx)
1128    }
1129
1130    /// Does any work that is required before the background task starts
1131    /// writing to the given append only introspection collection.
1132    ///
1133    /// This might include consolidation or deleting older entries.
1134    async fn prepare(&mut self, introspection_config: Option<AppendOnlyIntrospectionConfig>) {
1135        let Some(AppendOnlyIntrospectionConfig {
1136            introspection_type,
1137            config_set,
1138            parameters,
1139            storage_collections,
1140        }) = introspection_config
1141        else {
1142            return;
1143        };
1144        let initial_statuses = match introspection_type {
1145            IntrospectionType::ReplicaMetricsHistory
1146            | IntrospectionType::WallclockLagHistory
1147            | IntrospectionType::WallclockLagHistogram => {
1148                let result = partially_truncate_metrics_history(
1149                    self.id,
1150                    introspection_type,
1151                    &mut self.write_handle,
1152                    config_set,
1153                    self.now.clone(),
1154                    storage_collections,
1155                )
1156                .await;
1157                if let Err(error) = result {
1158                    soft_panic_or_log!(
1159                        "error truncating metrics history: {error} (type={introspection_type:?})"
1160                    );
1161                }
1162                Vec::new()
1163            }
1164
1165            IntrospectionType::PrivatelinkConnectionStatusHistory => {
1166                partially_truncate_status_history(
1167                    self.id,
1168                    IntrospectionType::PrivatelinkConnectionStatusHistory,
1169                    &mut self.write_handle,
1170                    privatelink_status_history_desc(&parameters),
1171                    self.now.clone(),
1172                    &storage_collections,
1173                )
1174                .await;
1175                Vec::new()
1176            }
1177            IntrospectionType::ReplicaStatusHistory => {
1178                partially_truncate_status_history(
1179                    self.id,
1180                    IntrospectionType::ReplicaStatusHistory,
1181                    &mut self.write_handle,
1182                    replica_status_history_desc(&parameters),
1183                    self.now.clone(),
1184                    &storage_collections,
1185                )
1186                .await;
1187                Vec::new()
1188            }
1189
1190            // Note [btv] - we don't truncate these, because that uses
1191            // a huge amount of memory on environmentd startup.
1192            IntrospectionType::PreparedStatementHistory
1193            | IntrospectionType::StatementExecutionHistory
1194            | IntrospectionType::SessionHistory
1195            | IntrospectionType::StatementLifecycleHistory
1196            | IntrospectionType::SqlText => {
1197                // NOTE(aljoscha): We never remove from these
1198                // collections. Someone, at some point needs to
1199                // think about that! Issue:
1200                // https://github.com/MaterializeInc/database-issues/issues/7666
1201                Vec::new()
1202            }
1203
1204            IntrospectionType::SourceStatusHistory => {
1205                let last_status_per_id = partially_truncate_status_history(
1206                    self.id,
1207                    IntrospectionType::SourceStatusHistory,
1208                    &mut self.write_handle,
1209                    source_status_history_desc(&parameters),
1210                    self.now.clone(),
1211                    &storage_collections,
1212                )
1213                .await;
1214
1215                let status_col = MZ_SOURCE_STATUS_HISTORY_DESC
1216                    .get_by_name(&ColumnName::from("status"))
1217                    .expect("schema has not changed")
1218                    .0;
1219
1220                last_status_per_id
1221                    .into_iter()
1222                    .map(|(id, row)| {
1223                        (
1224                            id,
1225                            Status::from_str(
1226                                row.iter()
1227                                    .nth(status_col)
1228                                    .expect("schema has not changed")
1229                                    .unwrap_str(),
1230                            )
1231                            .expect("statuses must be uncorrupted"),
1232                        )
1233                    })
1234                    .collect()
1235            }
1236            IntrospectionType::SinkStatusHistory => {
1237                let last_status_per_id = partially_truncate_status_history(
1238                    self.id,
1239                    IntrospectionType::SinkStatusHistory,
1240                    &mut self.write_handle,
1241                    sink_status_history_desc(&parameters),
1242                    self.now.clone(),
1243                    &storage_collections,
1244                )
1245                .await;
1246
1247                let status_col = MZ_SINK_STATUS_HISTORY_DESC
1248                    .get_by_name(&ColumnName::from("status"))
1249                    .expect("schema has not changed")
1250                    .0;
1251
1252                last_status_per_id
1253                    .into_iter()
1254                    .map(|(id, row)| {
1255                        (
1256                            id,
1257                            Status::from_str(
1258                                row.iter()
1259                                    .nth(status_col)
1260                                    .expect("schema has not changed")
1261                                    .unwrap_str(),
1262                            )
1263                            .expect("statuses must be uncorrupted"),
1264                        )
1265                    })
1266                    .collect()
1267            }
1268
1269            introspection_type @ IntrospectionType::ShardMapping
1270            | introspection_type @ IntrospectionType::Frontiers
1271            | introspection_type @ IntrospectionType::ReplicaFrontiers
1272            | introspection_type @ IntrospectionType::StorageSourceStatistics
1273            | introspection_type @ IntrospectionType::StorageSinkStatistics
1274            | introspection_type @ IntrospectionType::ComputeDependencies
1275            | introspection_type @ IntrospectionType::ComputeOperatorHydrationStatus
1276            | introspection_type @ IntrospectionType::ComputeMaterializedViewRefreshes
1277            | introspection_type @ IntrospectionType::ComputeErrorCounts
1278            | introspection_type @ IntrospectionType::ComputeHydrationTimes
1279            | introspection_type @ IntrospectionType::ComputeObjectArrangementSizes => {
1280                unreachable!("not append-only collection: {introspection_type:?}")
1281            }
1282        };
1283        if let Some(previous_statuses) = &mut self.previous_statuses {
1284            previous_statuses.extend(initial_statuses);
1285        }
1286    }
1287
1288    async fn run(mut self) {
1289        let mut interval = tokio::time::interval(Duration::from_millis(DEFAULT_TICK_MS));
1290
1291        let mut batch: Vec<(Vec<_>, _)> = Vec::new();
1292
1293        'run: loop {
1294            tokio::select! {
1295                // Prefer sending actual updates over just bumping the upper, because sending
1296                // updates also bump the upper.
1297                biased;
1298
1299                // Listen for a shutdown signal so we can gracefully cleanup.
1300                _ = &mut self.shutdown_rx => {
1301                    let mut senders = Vec::new();
1302
1303                    // Prevent new messages from being sent.
1304                    self.rx.close();
1305
1306                    // Get as many waiting senders as possible.
1307                    while let Ok((_batch, sender)) = self.rx.try_recv() {
1308                        senders.push(sender);
1309                    }
1310
1311                    // Notify them that this collection is closed.
1312                    //
1313                    // Note: if a task is shutting down, that indicates the source has been
1314                    // dropped, at which point the identifier is invalid. Returning this
1315                    // error provides a better user experience.
1316                    notify_listeners(senders, || Err(StorageError::IdentifierInvalid(self.id)));
1317
1318                    break 'run;
1319                }
1320
1321                // Pull a chunk of queued updates off the channel.
1322                () = recv_all_commands(&mut self.rx, &mut batch) => {
1323                    if batch.is_empty() {
1324                        // Sender has been dropped, which means the collection should have been
1325                        // unregistered, break out of the run loop if we weren't already
1326                        // aborted.
1327                        break 'run;
1328                    }
1329
1330                    // To rate limit appends to persist we add artificial latency, and will
1331                    // finish no sooner than this instant.
1332                    let batch_duration_ms = match self.id {
1333                        GlobalId::User(_) => Duration::from_millis(
1334                            self.user_batch_duration_ms.load(Ordering::Relaxed),
1335                        ),
1336                        // For non-user collections, always just use the default.
1337                        _ => STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT,
1338                    };
1339                    let use_batch_now = Instant::now();
1340                    let min_time_to_complete = use_batch_now + batch_duration_ms;
1341
1342                    tracing::debug!(
1343                        ?use_batch_now,
1344                        ?batch_duration_ms,
1345                        ?min_time_to_complete,
1346                        "batch duration",
1347                    );
1348
1349                    // Reset the interval which is used to periodically bump the uppers
1350                    // because the uppers will get bumped with the following update. This
1351                    // makes it such that we will write at most once every `interval`.
1352                    //
1353                    // For example, let's say our `DEFAULT_TICK` interval is 10, so at
1354                    // `t + 10`, `t + 20`, ... we'll bump the uppers. If we receive an
1355                    // update at `t + 3` we want to shift this window so we bump the uppers
1356                    // at `t + 13`, `t + 23`, ... which resetting the interval accomplishes.
1357                    interval.reset();
1358
1359                    let capacity: usize = batch
1360                        .iter()
1361                        .map(|(rows, _)| rows.len())
1362                        .sum();
1363                    let mut all_rows = Vec::with_capacity(capacity);
1364                    let mut responders = Vec::with_capacity(batch.len());
1365
1366                    for (updates, responder) in batch.drain(..) {
1367                        let rows = self.process_updates(updates);
1368
1369                        all_rows.extend(
1370                            rows.map(|(row, diff)| TimestamplessUpdate { row, diff }),
1371                        );
1372                        responders.push(responder);
1373                    }
1374
1375                    if self.read_only {
1376                        tracing::warn!(%self.id, ?all_rows, "append while in read-only mode");
1377                        notify_listeners(responders, || Err(StorageError::ReadOnly));
1378                        continue;
1379                    }
1380
1381                    // Append updates to persist!
1382                    let at_least = Timestamp::from((self.now)());
1383
1384                    if !all_rows.is_empty() {
1385                        monotonic_append(&mut self.write_handle, all_rows, at_least).await;
1386                    }
1387                    // Notify all of our listeners.
1388                    notify_listeners(responders, || Ok(()));
1389
1390                    // Wait until our artificial latency has completed.
1391                    //
1392                    // Note: if writing to persist took longer than `DEFAULT_TICK` this
1393                    // await will resolve immediately.
1394                    tokio::time::sleep_until(min_time_to_complete).await;
1395                }
1396
1397                // If we haven't received any updates, then we'll move the upper forward.
1398                _ = interval.tick() => {
1399                    if self.read_only {
1400                        // Not bumping uppers while in read-only mode.
1401                        continue;
1402                    }
1403
1404                    // Update our collection.
1405                    let now = Timestamp::from((self.now)());
1406                    let updates = vec![];
1407                    let at_least = now;
1408
1409                    // Failures don't matter when advancing collections' uppers. This might
1410                    // fail when a clusterd happens to be writing to this concurrently.
1411                    // Advancing uppers here is best-effort and only needs to succeed if no
1412                    // one else is advancing it; contention proves otherwise.
1413                    monotonic_append(&mut self.write_handle, updates, at_least).await;
1414                },
1415            }
1416        }
1417
1418        info!("write_task-{} ending", self.id);
1419    }
1420
1421    /// Deduplicate any [`mz_storage_client::client::StatusUpdate`] within `updates` and converts
1422    /// `updates` to rows and diffs.
1423    fn process_updates(
1424        &mut self,
1425        updates: Vec<AppendOnlyUpdate>,
1426    ) -> impl Iterator<Item = (Row, Diff)> {
1427        let updates = if let Some(previous_statuses) = &mut self.previous_statuses {
1428            let new: Vec<_> = updates
1429                .into_iter()
1430                .filter(|r| match r {
1431                    AppendOnlyUpdate::Row(_) => true,
1432                    AppendOnlyUpdate::Status(update) => {
1433                        match (
1434                            previous_statuses
1435                                .get(&(update.id, update.replica_id))
1436                                .as_deref(),
1437                            &update.status,
1438                        ) {
1439                            (None, _) => true,
1440                            (Some(old), new) => old.superseded_by(*new),
1441                        }
1442                    }
1443                })
1444                .collect();
1445            previous_statuses.extend(new.iter().filter_map(|update| match update {
1446                AppendOnlyUpdate::Row(_) => None,
1447                AppendOnlyUpdate::Status(update) => {
1448                    Some(((update.id, update.replica_id), update.status))
1449                }
1450            }));
1451            new
1452        } else {
1453            updates
1454        };
1455
1456        updates.into_iter().map(AppendOnlyUpdate::into_row)
1457    }
1458}
1459
1460/// Truncates the given metrics history by removing all entries older than that history's
1461/// configured retention interval.
1462///
1463/// # Panics
1464///
1465/// Panics if `collection` is not a metrics history.
1466async fn partially_truncate_metrics_history(
1467    id: GlobalId,
1468    introspection_type: IntrospectionType,
1469    write_handle: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1470    config_set: Arc<ConfigSet>,
1471    now: NowFn,
1472    storage_collections: Arc<dyn StorageCollections + Send + Sync>,
1473) -> Result<(), anyhow::Error> {
1474    let (keep_duration, occurred_at_col) = match introspection_type {
1475        IntrospectionType::ReplicaMetricsHistory => (
1476            REPLICA_METRICS_HISTORY_RETENTION_INTERVAL.get(&config_set),
1477            REPLICA_METRICS_HISTORY_DESC
1478                .get_by_name(&ColumnName::from("occurred_at"))
1479                .expect("schema has not changed")
1480                .0,
1481        ),
1482        IntrospectionType::WallclockLagHistory => (
1483            WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL.get(&config_set),
1484            WALLCLOCK_LAG_HISTORY_DESC
1485                .get_by_name(&ColumnName::from("occurred_at"))
1486                .expect("schema has not changed")
1487                .0,
1488        ),
1489        IntrospectionType::WallclockLagHistogram => (
1490            WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL.get(&config_set),
1491            WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC
1492                .get_by_name(&ColumnName::from("period_start"))
1493                .expect("schema has not changed")
1494                .0,
1495        ),
1496        _ => panic!("not a metrics history: {introspection_type:?}"),
1497    };
1498
1499    let upper = write_handle.fetch_recent_upper().await;
1500    let Some(upper_ts) = upper.as_option() else {
1501        bail!("collection is sealed");
1502    };
1503    let Some(as_of_ts) = upper_ts.step_back() else {
1504        return Ok(()); // nothing to truncate
1505    };
1506
1507    let mut rows = storage_collections
1508        .snapshot_cursor(id, as_of_ts)
1509        .await
1510        .map_err(|e| anyhow!("reading snapshot: {e:?}"))?;
1511
1512    let now = mz_ore::now::to_datetime(now());
1513    let keep_since = now - keep_duration;
1514
1515    // It is very important that we append our retractions at the timestamp
1516    // right after the timestamp at which we got our snapshot. Otherwise,
1517    // it's possible for someone else to sneak in retractions or other
1518    // unexpected changes.
1519    let old_upper_ts = *upper_ts;
1520    let new_upper_ts = old_upper_ts.step_forward();
1521
1522    // Produce retractions by inverting diffs of rows we want to delete.
1523    let mut builder = write_handle.builder(Antichain::from_elem(old_upper_ts));
1524    // Re-used allocation for decoding rows, avoids allocating a fresh vector per row.
1525    let mut datum_vec = DatumVec::new();
1526    while let Some(chunk) = rows.next().await {
1527        for (data, _t, diff) in chunk {
1528            let Ok(row) = &data.0 else { continue };
1529            let datums = datum_vec.borrow_with(row);
1530            let occurred_at = datums[occurred_at_col].unwrap_timestamptz();
1531            if *occurred_at >= keep_since {
1532                continue;
1533            }
1534            let diff = -diff;
1535            match builder.add(&data, &(), &old_upper_ts, &diff).await? {
1536                Added::Record => {}
1537                Added::RecordAndParts => {
1538                    debug!(?id, "added part to builder");
1539                }
1540            }
1541        }
1542    }
1543
1544    let mut updates = builder.finish(Antichain::from_elem(new_upper_ts)).await?;
1545    let mut batches = vec![&mut updates];
1546
1547    write_handle
1548        .compare_and_append_batch(
1549            batches.as_mut_slice(),
1550            Antichain::from_elem(old_upper_ts),
1551            Antichain::from_elem(new_upper_ts),
1552            true,
1553        )
1554        .await
1555        .expect("valid usage")
1556        .map_err(|e| anyhow!("appending retractions: {e:?}"))
1557}
1558
1559/// Effectively truncates the status history shard based on its retention policy.
1560///
1561/// NOTE: The history collections are really append-only collections, but
1562/// every-now-and-then we want to retract old updates so that the collection
1563/// does not grow unboundedly. Crucially, these are _not_ incremental
1564/// collections, they are not derived from a state at some time `t` and we
1565/// cannot maintain a desired state for them.
1566///
1567/// Returns a map with latest unpacked row per key.
1568pub(crate) async fn partially_truncate_status_history<K>(
1569    id: GlobalId,
1570    introspection_type: IntrospectionType,
1571    write_handle: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1572    status_history_desc: StatusHistoryDesc<K>,
1573    now: NowFn,
1574    storage_collections: &Arc<dyn StorageCollections + Send + Sync>,
1575) -> BTreeMap<K, Row>
1576where
1577    K: Clone + Debug + Ord + Send + Sync,
1578{
1579    let upper = write_handle.fetch_recent_upper().await.clone();
1580
1581    let mut rows = match upper.as_option() {
1582        Some(f) if f > &Timestamp::MIN => {
1583            let as_of = f.step_back().unwrap();
1584
1585            storage_collections
1586                .snapshot_cursor(id, as_of)
1587                .await
1588                .expect("snapshot succeeds")
1589        }
1590        // If collection is closed or the frontier is the minimum, we cannot
1591        // or don't need to truncate (respectively).
1592        _ => return BTreeMap::new(),
1593    };
1594
1595    // BTreeMap to keep track of the row with the latest timestamp for each key.
1596    let mut latest_row_per_key: BTreeMap<K, (CheckedTimestamp<DateTime<Utc>>, Row)> =
1597        BTreeMap::new();
1598
1599    // It is very important that we append our retractions at the timestamp
1600    // right after the timestamp at which we got our snapshot. Otherwise,
1601    // it's possible for someone else to sneak in retractions or other
1602    // unexpected changes.
1603    let expected_upper = upper.into_option().expect("checked above");
1604    let new_upper = expected_upper.step_forward();
1605
1606    let mut deletions = write_handle.builder(Antichain::from_elem(expected_upper));
1607
1608    let mut handle_row = {
1609        let latest_row_per_key = &mut latest_row_per_key;
1610        // Re-used allocation for decoding rows, avoids allocating a fresh vector per row.
1611        let mut datum_vec = DatumVec::new();
1612        move |row: &Row, diff| {
1613            let datums = datum_vec.borrow_with(row);
1614            let key = (status_history_desc.extract_key)(&datums);
1615            let timestamp = (status_history_desc.extract_time)(&datums);
1616
1617            assert!(
1618                diff > 0,
1619                "only know how to operate over consolidated data with diffs > 0, \
1620                    found diff {diff} for object {key:?} in {introspection_type:?}",
1621            );
1622
1623            // Keep track of the timestamp of the latest row per key.
1624            match latest_row_per_key.get(&key) {
1625                Some(existing) if &existing.0 > &timestamp => {}
1626                _ => {
1627                    latest_row_per_key.insert(key.clone(), (timestamp, row.clone()));
1628                }
1629            };
1630            (key, timestamp)
1631        }
1632    };
1633
1634    match status_history_desc.retention_policy {
1635        StatusHistoryRetentionPolicy::LastN(n) => {
1636            // BTreeMap to track the earliest events for each key.
1637            let mut last_n_entries_per_key: BTreeMap<
1638                K,
1639                BinaryHeap<Reverse<(CheckedTimestamp<DateTime<Utc>>, Row)>>,
1640            > = BTreeMap::new();
1641
1642            while let Some(chunk) = rows.next().await {
1643                for (data, _t, diff) in chunk {
1644                    let Ok(row) = &data.0 else { continue };
1645                    let (key, timestamp) = handle_row(row, diff);
1646
1647                    // Duplicate rows ARE possible if many status changes happen in VERY quick succession,
1648                    // so we handle duplicated rows separately.
1649                    let entries = last_n_entries_per_key.entry(key).or_default();
1650                    for _ in 0..diff {
1651                        // We CAN have multiple statuses (most likely Starting and Running) at the exact same
1652                        // millisecond, depending on how the `health_operator` is scheduled.
1653                        //
1654                        // Note that these will be arbitrarily ordered, so a Starting event might
1655                        // survive and a Running one won't. The next restart will remove the other,
1656                        // so we don't bother being careful about it.
1657                        //
1658                        // TODO(guswynn): unpack these into health-status objects and use
1659                        // their `Ord` impl.
1660                        entries.push(Reverse((timestamp, row.clone())));
1661
1662                        // Retain some number of entries, using pop to mark the oldest entries for
1663                        // deletion.
1664                        while entries.len() > n {
1665                            if let Some(Reverse((_, r))) = entries.pop() {
1666                                deletions
1667                                    .add(&SourceData(Ok(r)), &(), &expected_upper, &-1)
1668                                    .await
1669                                    .expect("usage should be valid");
1670                            }
1671                        }
1672                    }
1673                }
1674            }
1675        }
1676        StatusHistoryRetentionPolicy::TimeWindow(time_window) => {
1677            // Get the lower bound of our retention window
1678            let now = mz_ore::now::to_datetime(now());
1679            let keep_since = now - time_window;
1680
1681            // Mark any row outside the retention window for deletion
1682            while let Some(chunk) = rows.next().await {
1683                for (data, _t, diff) in chunk {
1684                    let Ok(row) = &data.0 else { continue };
1685                    let (_, timestamp) = handle_row(row, diff);
1686
1687                    if *timestamp < keep_since {
1688                        deletions
1689                            .add(&data, &(), &expected_upper, &-1)
1690                            .await
1691                            .expect("usage should be valid");
1692                    }
1693                }
1694            }
1695        }
1696    }
1697
1698    let mut updates = deletions
1699        .finish(Antichain::from_elem(new_upper))
1700        .await
1701        .expect("expected valid usage");
1702    let mut batches = vec![&mut updates];
1703
1704    // Updates are only deletes because everything else is already in the shard.\
1705    let res = write_handle
1706        .compare_and_append_batch(
1707            batches.as_mut_slice(),
1708            Antichain::from_elem(expected_upper),
1709            Antichain::from_elem(new_upper),
1710            true,
1711        )
1712        .await
1713        .expect("usage was valid");
1714
1715    match res {
1716        Ok(_) => {
1717            // All good, yay!
1718        }
1719        Err(err) => {
1720            // This is fine, it just means the upper moved because
1721            // of continual upper advancement or because someone
1722            // already appended some more retractions/updates.
1723            //
1724            // NOTE: We might want to attempt these partial
1725            // retractions on an interval, instead of only when
1726            // starting up!
1727            info!(
1728                %id, ?expected_upper, current_upper = ?err.current,
1729                "failed to append partial truncation",
1730            );
1731        }
1732    }
1733
1734    latest_row_per_key
1735        .into_iter()
1736        .map(|(key, (_, row))| (key, row))
1737        .collect()
1738}
1739
1740async fn monotonic_append(
1741    write_handle: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1742    updates: Vec<TimestamplessUpdate>,
1743    at_least: Timestamp,
1744) {
1745    let mut expected_upper = write_handle.shared_upper();
1746    loop {
1747        if updates.is_empty() && expected_upper.is_empty() {
1748            // Ignore timestamp advancement for
1749            // closed collections. TODO? Make this a
1750            // correctable error
1751            return;
1752        }
1753
1754        let upper = expected_upper
1755            .into_option()
1756            .expect("cannot append data to closed collection");
1757
1758        let lower = std::cmp::max(upper, at_least);
1759        let new_upper = lower.step_forward();
1760        let updates = updates
1761            .iter()
1762            .map(|TimestamplessUpdate { row, diff }| {
1763                ((SourceData(Ok(row.clone())), ()), lower, diff.into_inner())
1764            })
1765            .collect::<Vec<_>>();
1766        let res = write_handle
1767            .compare_and_append(
1768                updates,
1769                Antichain::from_elem(upper),
1770                Antichain::from_elem(new_upper),
1771            )
1772            .await
1773            .expect("valid usage");
1774        match res {
1775            Ok(()) => return,
1776            Err(err) => {
1777                expected_upper = err.current;
1778                continue;
1779            }
1780        }
1781    }
1782}
1783
1784// Helper method for notifying listeners.
1785fn notify_listeners<T>(
1786    responders: impl IntoIterator<Item = oneshot::Sender<T>>,
1787    result: impl Fn() -> T,
1788) {
1789    for r in responders {
1790        // We don't care if the listener disappeared.
1791        let _ = r.send(result());
1792    }
1793}
1794
1795/// Receive all currently enqueued messages from a task command channel.
1796///
1797/// If no messages are initially enqueued, this function blocks until messages become available or
1798/// the channel is closed.
1799///
1800/// If the `out` buffer has a large amount of free capacity at the end of this operation, it is
1801/// shrunk to reclaim some of its memory.
1802///
1803/// # Cancel safety
1804///
1805/// This function is cancel safe. It only awaits `UnboundedReceiver::recv`, which is itself cancel
1806/// safe.
1807async fn recv_all_commands<T>(rx: &mut mpsc::UnboundedReceiver<T>, out: &mut Vec<T>) {
1808    if let Some(msg) = rx.recv().await {
1809        out.push(msg);
1810    } else {
1811        return; // channel closed
1812    };
1813
1814    out.reserve(rx.len());
1815    while let Ok(msg) = rx.try_recv() {
1816        out.push(msg);
1817    }
1818
1819    // We may have the opportunity to reclaim allocated memory.
1820    // Given that `push` will at most double the capacity when the vector is more than half full,
1821    // and we want to avoid entering into a resizing cycle, we choose to only shrink if the
1822    // vector's length is less than one fourth of its capacity.
1823    if out.capacity() > out.len() * 4 {
1824        out.shrink_to_fit();
1825    }
1826}
1827
1828#[cfg(test)]
1829mod tests {
1830    use std::collections::BTreeSet;
1831
1832    use super::*;
1833    use itertools::Itertools;
1834    use mz_repr::{Datum, Row};
1835    use mz_storage_client::client::StatusUpdate;
1836    use mz_storage_client::healthcheck::{
1837        MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC,
1838    };
1839
1840    #[mz_ore::test]
1841    fn test_row() {
1842        let error_message = "error message";
1843        let hint = "hint message";
1844        let id = GlobalId::User(1);
1845        let status = Status::Dropped;
1846        let row = Row::from(StatusUpdate {
1847            id,
1848            timestamp: chrono::offset::Utc::now(),
1849            status,
1850            error: Some(error_message.to_string()),
1851            hints: BTreeSet::from([hint.to_string()]),
1852            namespaced_errors: Default::default(),
1853            replica_id: None,
1854        });
1855
1856        for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1857            assert!(datum.is_instance_of_sql(column_type));
1858        }
1859
1860        for (datum, column_type) in row
1861            .iter()
1862            .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
1863        {
1864            assert!(datum.is_instance_of_sql(column_type));
1865        }
1866
1867        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1868        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1869        assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
1870
1871        let details = row
1872            .iter()
1873            .nth(4)
1874            .unwrap()
1875            .unwrap_map()
1876            .iter()
1877            .collect::<Vec<_>>();
1878
1879        assert_eq!(details.len(), 1);
1880        let hint_datum = &details[0];
1881
1882        assert_eq!(hint_datum.0, "hints");
1883        assert_eq!(
1884            hint_datum.1.unwrap_list().iter().next().unwrap(),
1885            Datum::String(hint)
1886        );
1887    }
1888
1889    #[mz_ore::test]
1890    fn test_row_without_hint() {
1891        let error_message = "error message";
1892        let id = GlobalId::User(1);
1893        let status = Status::Dropped;
1894        let row = Row::from(StatusUpdate {
1895            id,
1896            timestamp: chrono::offset::Utc::now(),
1897            status,
1898            error: Some(error_message.to_string()),
1899            hints: Default::default(),
1900            namespaced_errors: Default::default(),
1901            replica_id: None,
1902        });
1903
1904        for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1905            assert!(datum.is_instance_of_sql(column_type));
1906        }
1907
1908        for (datum, column_type) in row
1909            .iter()
1910            .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
1911        {
1912            assert!(datum.is_instance_of_sql(column_type));
1913        }
1914
1915        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1916        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1917        assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
1918        assert_eq!(row.iter().nth(4).unwrap(), Datum::Null);
1919    }
1920
1921    #[mz_ore::test]
1922    fn test_row_without_error() {
1923        let id = GlobalId::User(1);
1924        let status = Status::Dropped;
1925        let hint = "hint message";
1926        let row = Row::from(StatusUpdate {
1927            id,
1928            timestamp: chrono::offset::Utc::now(),
1929            status,
1930            error: None,
1931            hints: BTreeSet::from([hint.to_string()]),
1932            namespaced_errors: Default::default(),
1933            replica_id: None,
1934        });
1935
1936        for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1937            assert!(datum.is_instance_of_sql(column_type));
1938        }
1939
1940        for (datum, column_type) in row
1941            .iter()
1942            .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
1943        {
1944            assert!(datum.is_instance_of_sql(column_type));
1945        }
1946
1947        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1948        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1949        assert_eq!(row.iter().nth(3).unwrap(), Datum::Null);
1950
1951        let details = row
1952            .iter()
1953            .nth(4)
1954            .unwrap()
1955            .unwrap_map()
1956            .iter()
1957            .collect::<Vec<_>>();
1958
1959        assert_eq!(details.len(), 1);
1960        let hint_datum = &details[0];
1961
1962        assert_eq!(hint_datum.0, "hints");
1963        assert_eq!(
1964            hint_datum.1.unwrap_list().iter().next().unwrap(),
1965            Datum::String(hint)
1966        );
1967    }
1968
1969    #[mz_ore::test]
1970    fn test_row_with_namespaced() {
1971        let error_message = "error message";
1972        let id = GlobalId::User(1);
1973        let status = Status::Dropped;
1974        let row = Row::from(StatusUpdate {
1975            id,
1976            timestamp: chrono::offset::Utc::now(),
1977            status,
1978            error: Some(error_message.to_string()),
1979            hints: Default::default(),
1980            namespaced_errors: BTreeMap::from([("thing".to_string(), "error".to_string())]),
1981            replica_id: None,
1982        });
1983
1984        for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1985            assert!(datum.is_instance_of_sql(column_type));
1986        }
1987
1988        for (datum, column_type) in row
1989            .iter()
1990            .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
1991        {
1992            assert!(datum.is_instance_of_sql(column_type));
1993        }
1994
1995        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1996        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1997        assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
1998
1999        let details = row
2000            .iter()
2001            .nth(4)
2002            .unwrap()
2003            .unwrap_map()
2004            .iter()
2005            .collect::<Vec<_>>();
2006
2007        assert_eq!(details.len(), 1);
2008        let ns_datum = &details[0];
2009
2010        assert_eq!(ns_datum.0, "namespaced");
2011        assert_eq!(
2012            ns_datum.1.unwrap_map().iter().next().unwrap(),
2013            ("thing", Datum::String("error"))
2014        );
2015    }
2016
2017    #[mz_ore::test]
2018    fn test_row_with_everything() {
2019        let error_message = "error message";
2020        let hint = "hint message";
2021        let id = GlobalId::User(1);
2022        let status = Status::Dropped;
2023        let row = Row::from(StatusUpdate {
2024            id,
2025            timestamp: chrono::offset::Utc::now(),
2026            status,
2027            error: Some(error_message.to_string()),
2028            hints: BTreeSet::from([hint.to_string()]),
2029            namespaced_errors: BTreeMap::from([("thing".to_string(), "error".to_string())]),
2030            replica_id: None,
2031        });
2032
2033        for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
2034            assert!(datum.is_instance_of_sql(column_type));
2035        }
2036
2037        for (datum, column_type) in row
2038            .iter()
2039            .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
2040        {
2041            assert!(datum.is_instance_of_sql(column_type));
2042        }
2043
2044        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
2045        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
2046        assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
2047
2048        let details = row
2049            .iter()
2050            .nth(4)
2051            .unwrap()
2052            .unwrap_map()
2053            .iter()
2054            .collect::<Vec<_>>();
2055
2056        assert_eq!(details.len(), 2);
2057        // These are always sorted
2058        let hint_datum = &details[0];
2059        let ns_datum = &details[1];
2060
2061        assert_eq!(hint_datum.0, "hints");
2062        assert_eq!(
2063            hint_datum.1.unwrap_list().iter().next().unwrap(),
2064            Datum::String(hint)
2065        );
2066
2067        assert_eq!(ns_datum.0, "namespaced");
2068        assert_eq!(
2069            ns_datum.1.unwrap_map().iter().next().unwrap(),
2070            ("thing", Datum::String("error"))
2071        );
2072    }
2073}