Skip to main content

mz_storage_controller/
collection_mgmt.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Tokio tasks (and support machinery) for maintaining storage-managed
11//! collections.
12//!
13//! We differentiate between append-only collections and differential
14//! collections. The intent is that knowing the type allows being more
15//! intentional about what state we keep in memory and how we work when in
16//! read-only mode / during zero-downtime upgrades.
17//!
18//! ## Append-only collections
19//!
20//! Writers only append blind writes. Those writes never fail. It does not
21//! matter at what timestamp they happen (to a degree, but ...).
22//!
23//! While in read-only mode, the append-only write task can immediately write
24//! updates out as batches, but only append them when going out of read-only
25//! mode.
26//!
27//! ## Differential collections
28//!
29//! These are very similar to the self-correcting persist_sink. We have an
30//! in-memory desired state and continually make it so that persist matches
31//! desired. As described below (in the task implementation), we could do this
32//! in a memory efficient way by keeping open a persist read handle and
33//! continually updating/consolidating our desired collection. This way, we
34//! would be memory-efficient even in read-only mode.
35//!
36//! This is an evolution of the current design where, on startup, we bring the
37//! persist collection into a known state (mostly by retracting everything) and
38//! then assume that this `envd` is the only writer. We panic when that is ever
39//! not the case, which we notice when the upper of a collection changes
40//! unexpectedly. With this new design we can instead continually update our
41//! view of the persist shard and emit updates when needed, when desired
42//! changed.
43//!
44//! NOTE: As it is, we always keep all of desired in memory. Only when told to
45//! go out of read-only mode would we start attempting to write.
46//!
47//! ## Read-only mode
48//!
49//! When [`CollectionManager`] is in read-only mode it cannot write out to
50//! persist. It will, however, maintain the `desired` state of differential
51//! collections so that we can immediately start writing out updates when going
52//! out of read-only mode.
53//!
54//! For append-only collections we either panic, in the case of
55//! [`CollectionManager::blind_write`], or report back a
56//! [`StorageError::ReadOnly`] when trying to append through a
57//! [`MonotonicAppender`] returned from
58//! [`CollectionManager::monotonic_appender`].
59
60use std::any::Any;
61use std::cmp::Reverse;
62use std::collections::{BTreeMap, BinaryHeap};
63use std::fmt::Debug;
64use std::ops::ControlFlow;
65use std::pin::Pin;
66use std::str::FromStr;
67use std::sync::atomic::{AtomicU64, Ordering};
68use std::sync::{Arc, Mutex};
69
70use anyhow::{anyhow, bail};
71use chrono::{DateTime, Utc};
72use differential_dataflow::consolidation;
73use differential_dataflow::lattice::Lattice;
74use futures::future::BoxFuture;
75use futures::stream::StreamExt;
76use futures::{Future, FutureExt};
77use mz_cluster_client::ReplicaId;
78use mz_dyncfg::ConfigSet;
79use mz_ore::now::{EpochMillis, NowFn};
80use mz_ore::retry::Retry;
81use mz_ore::soft_panic_or_log;
82use mz_ore::task::AbortOnDropHandle;
83use mz_persist_client::batch::Added;
84use mz_persist_client::read::ReadHandle;
85use mz_persist_client::write::WriteHandle;
86use mz_persist_types::Codec64;
87use mz_repr::adt::timestamp::CheckedTimestamp;
88use mz_repr::{ColumnName, Diff, GlobalId, Row, TimestampManipulation};
89use mz_storage_client::client::{AppendOnlyUpdate, Status, TimestamplessUpdate};
90use mz_storage_client::controller::{IntrospectionType, MonotonicAppender, StorageWriteOp};
91use mz_storage_client::healthcheck::{
92    MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_METRICS_HISTORY_DESC,
93    WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC, WALLCLOCK_LAG_HISTORY_DESC,
94};
95use mz_storage_client::metrics::StorageControllerMetrics;
96use mz_storage_client::statistics::ControllerSinkStatistics;
97use mz_storage_client::storage_collections::StorageCollections;
98use mz_storage_types::StorageDiff;
99use mz_storage_types::controller::InvalidUpper;
100use mz_storage_types::dyncfgs::{
101    REPLICA_METRICS_HISTORY_RETENTION_INTERVAL, WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL,
102    WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL,
103};
104use mz_storage_types::parameters::{
105    STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT, StorageParameters,
106};
107use mz_storage_types::sources::SourceData;
108use timely::progress::{Antichain, Timestamp};
109use tokio::sync::{mpsc, oneshot, watch};
110use tokio::time::{Duration, Instant};
111use tracing::{debug, error, info};
112
113use crate::{
114    StatusHistoryDesc, StatusHistoryRetentionPolicy, StorageError, collection_mgmt,
115    privatelink_status_history_desc, replica_status_history_desc, sink_status_history_desc,
116    snapshot_statistics, source_status_history_desc, statistics,
117};
118
119// Default rate at which we advance the uppers of managed collections.
120const DEFAULT_TICK_MS: u64 = 1_000;
121
122/// A channel for sending writes to a differential collection.
123type DifferentialWriteChannel<T> =
124    mpsc::UnboundedSender<(StorageWriteOp, oneshot::Sender<Result<(), StorageError<T>>>)>;
125
126/// A channel for sending writes to an append-only collection.
127type AppendOnlyWriteChannel<T> = mpsc::UnboundedSender<(
128    Vec<AppendOnlyUpdate>,
129    oneshot::Sender<Result<(), StorageError<T>>>,
130)>;
131
132type WriteTask = AbortOnDropHandle<()>;
133type ShutdownSender = oneshot::Sender<()>;
134
135/// Types of storage-managed/introspection collections:
136///
137/// Append-only: Only accepts blind writes, writes that can be applied at any
138/// timestamp and don’t depend on current collection contents.
139///
140/// Pseudo append-only: We treat them largely as append-only collections but
141/// periodically (currently on bootstrap) retract old updates from them.
142///
143/// Differential: at any given time `t` , collection contents mirrors some
144/// (small cardinality) state. The cardinality of the collection stays constant
145/// if the thing that is mirrored doesn’t change in cardinality. At steady
146/// state, updates always come in pairs of retractions/additions.
147pub enum CollectionManagerKind {
148    AppendOnly,
149    Differential,
150}
151
152#[derive(Debug, Clone)]
153pub struct CollectionManager<T>
154where
155    T: Timestamp + Lattice + Codec64 + TimestampManipulation,
156{
157    /// When a [`CollectionManager`] is in read-only mode it must not affect any
158    /// changes to external state.
159    read_only: bool,
160
161    // WIP: Name TBD! I thought about `managed_collections`, `ivm_collections`,
162    // `self_correcting_collections`.
163    /// These are collections that we write to by adding/removing updates to an
164    /// internal _desired_ collection. The `CollectionManager` continually makes
165    /// sure that collection contents (in persist) match the desired state.
166    differential_collections:
167        Arc<Mutex<BTreeMap<GlobalId, (DifferentialWriteChannel<T>, WriteTask, ShutdownSender)>>>,
168
169    /// Collections that we only append to using blind-writes.
170    ///
171    /// Every write succeeds at _some_ timestamp, and we never check what the
172    /// actual contents of the collection (in persist) are.
173    append_only_collections:
174        Arc<Mutex<BTreeMap<GlobalId, (AppendOnlyWriteChannel<T>, WriteTask, ShutdownSender)>>>,
175
176    /// Amount of time we'll wait before sending a batch of inserts to Persist, for user
177    /// collections.
178    user_batch_duration_ms: Arc<AtomicU64>,
179    now: NowFn,
180}
181
182/// The `CollectionManager` provides two complementary functions:
183/// - Providing an API to append values to a registered set of collections.
184///   For this usecase:
185///     - The `CollectionManager` expects to be the only writer.
186///     - Appending to a closed collection panics
187/// - Automatically advancing the timestamp of managed collections every
188///   second. For this usecase:
189///     - The `CollectionManager` handles contention by permitting and ignoring errors.
190///     - Closed collections will not panic if they continue receiving these requests.
191impl<T> CollectionManager<T>
192where
193    T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
194{
195    pub(super) fn new(read_only: bool, now: NowFn) -> CollectionManager<T> {
196        let batch_duration_ms: u64 = STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT
197            .as_millis()
198            .try_into()
199            .expect("known to fit");
200
201        CollectionManager {
202            read_only,
203            differential_collections: Arc::new(Mutex::new(BTreeMap::new())),
204            append_only_collections: Arc::new(Mutex::new(BTreeMap::new())),
205            user_batch_duration_ms: Arc::new(AtomicU64::new(batch_duration_ms)),
206            now,
207        }
208    }
209
210    /// Updates the duration we'll wait to batch events for user owned collections.
211    pub fn update_user_batch_duration(&self, duration: Duration) {
212        tracing::info!(?duration, "updating user batch duration");
213        let millis: u64 = duration.as_millis().try_into().unwrap_or(u64::MAX);
214        self.user_batch_duration_ms.store(millis, Ordering::Relaxed);
215    }
216
217    /// Registers a new _differential collection_.
218    ///
219    /// The [CollectionManager] will automatically advance the upper of every
220    /// registered collection every second.
221    ///
222    /// Update the `desired` state of a differential collection using
223    /// [Self::differential_write].
224    pub(super) fn register_differential_collection<R>(
225        &self,
226        id: GlobalId,
227        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
228        read_handle_fn: R,
229        force_writable: bool,
230        introspection_config: DifferentialIntrospectionConfig<T>,
231    ) where
232        R: FnMut() -> Pin<
233                Box<dyn Future<Output = ReadHandle<SourceData, (), T, StorageDiff>> + Send>,
234            > + Send
235            + Sync
236            + 'static,
237    {
238        let mut guard = self
239            .differential_collections
240            .lock()
241            .expect("collection_mgmt panicked");
242
243        // Check if this collection is already registered.
244        if let Some((_writer, task, _shutdown_tx)) = guard.get(&id) {
245            // The collection is already registered and the task is still running so nothing to do.
246            if !task.is_finished() {
247                // TODO(parkmycar): Panic here if we never see this error in production.
248                tracing::error!("Registered a collection twice! {id:?}");
249                return;
250            }
251        }
252
253        let read_only = self.get_read_only(id, force_writable);
254
255        // Spawns a new task so we can write to this collection.
256        let writer_and_handle = DifferentialWriteTask::spawn(
257            id,
258            write_handle,
259            read_handle_fn,
260            read_only,
261            self.now.clone(),
262            introspection_config,
263        );
264        let prev = guard.insert(id, writer_and_handle);
265
266        // Double check the previous task was actually finished.
267        if let Some((_, prev_task, _)) = prev {
268            assert!(
269                prev_task.is_finished(),
270                "should only spawn a new task if the previous is finished"
271            );
272        }
273    }
274
275    /// Registers a new _append-only collection_.
276    ///
277    /// The [CollectionManager] will automatically advance the upper of every
278    /// registered collection every second.
279    pub(super) fn register_append_only_collection(
280        &self,
281        id: GlobalId,
282        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
283        force_writable: bool,
284        introspection_config: Option<AppendOnlyIntrospectionConfig<T>>,
285    ) {
286        let mut guard = self
287            .append_only_collections
288            .lock()
289            .expect("collection_mgmt panicked");
290
291        // Check if this collection is already registered.
292        if let Some((_writer, task, _shutdown_tx)) = guard.get(&id) {
293            // The collection is already registered and the task is still running so nothing to do.
294            if !task.is_finished() {
295                // TODO(parkmycar): Panic here if we never see this error in production.
296                tracing::error!("Registered a collection twice! {id:?}");
297                return;
298            }
299        }
300
301        let read_only = self.get_read_only(id, force_writable);
302
303        // Spawns a new task so we can write to this collection.
304        let writer_and_handle = AppendOnlyWriteTask::spawn(
305            id,
306            write_handle,
307            read_only,
308            self.now.clone(),
309            Arc::clone(&self.user_batch_duration_ms),
310            introspection_config,
311        );
312        let prev = guard.insert(id, writer_and_handle);
313
314        // Double check the previous task was actually finished.
315        if let Some((_, prev_task, _)) = prev {
316            assert!(
317                prev_task.is_finished(),
318                "should only spawn a new task if the previous is finished"
319            );
320        }
321    }
322
323    /// Unregisters the given collection.
324    ///
325    /// Also waits until the `CollectionManager` has completed all outstanding work to ensure that
326    /// it has stopped referencing the provided `id`.
327    #[mz_ore::instrument(level = "debug")]
328    pub(super) fn unregister_collection(&self, id: GlobalId) -> BoxFuture<'static, ()> {
329        let prev = self
330            .differential_collections
331            .lock()
332            .expect("CollectionManager panicked")
333            .remove(&id);
334
335        // Wait for the task to complete before reporting as unregisted.
336        if let Some((_prev_writer, prev_task, shutdown_tx)) = prev {
337            // Notify the task it needs to shutdown.
338            //
339            // We can ignore errors here because they indicate the task is already done.
340            let _ = shutdown_tx.send(());
341            return Box::pin(prev_task.map(|_| ()));
342        }
343
344        let prev = self
345            .append_only_collections
346            .lock()
347            .expect("CollectionManager panicked")
348            .remove(&id);
349
350        // Wait for the task to complete before reporting as unregisted.
351        if let Some((_prev_writer, prev_task, shutdown_tx)) = prev {
352            // Notify the task it needs to shutdown.
353            //
354            // We can ignore errors here because they indicate the task is already done.
355            let _ = shutdown_tx.send(());
356            return Box::pin(prev_task.map(|_| ()));
357        }
358
359        Box::pin(futures::future::ready(()))
360    }
361
362    /// Returns a sender for writes to the given append-only collection.
363    ///
364    /// # Panics
365    /// - If `id` does not belong to an append-only collections.
366    pub(super) fn append_only_write_sender(&self, id: GlobalId) -> AppendOnlyWriteChannel<T> {
367        let collections = self.append_only_collections.lock().expect("poisoned");
368        match collections.get(&id) {
369            Some((tx, _, _)) => tx.clone(),
370            None => panic!("missing append-only collection: {id}"),
371        }
372    }
373
374    /// Returns a sender for writes to the given differential collection.
375    ///
376    /// # Panics
377    /// - If `id` does not belong to a differential collections.
378    pub(super) fn differential_write_sender(&self, id: GlobalId) -> DifferentialWriteChannel<T> {
379        let collections = self.differential_collections.lock().expect("poisoned");
380        match collections.get(&id) {
381            Some((tx, _, _)) => tx.clone(),
382            None => panic!("missing differential collection: {id}"),
383        }
384    }
385
386    /// Appends `updates` to the append-only collection identified by `id`, at
387    /// _some_ timestamp. Does not wait for the append to complete.
388    ///
389    /// # Panics
390    /// - If `id` does not belong to an append-only collections.
391    /// - If this [`CollectionManager`] is in read-only mode.
392    /// - If the collection closed.
393    pub(super) fn blind_write(&self, id: GlobalId, updates: Vec<AppendOnlyUpdate>) {
394        if self.read_only {
395            panic!("attempting blind write to {} while in read-only mode", id);
396        }
397
398        if updates.is_empty() {
399            return;
400        }
401
402        let collections = self.append_only_collections.lock().expect("poisoned");
403        match collections.get(&id) {
404            Some((update_tx, _, _)) => {
405                let (tx, _rx) = oneshot::channel();
406                update_tx.send((updates, tx)).expect("rx hung up");
407            }
408            None => panic!("missing append-only collection: {id}"),
409        }
410    }
411
412    /// Updates the desired collection state of the differential collection identified by
413    /// `id`. The underlying persist shard will reflect this change at
414    /// _some_point. Does not wait for the change to complete.
415    ///
416    /// # Panics
417    /// - If `id` does not belong to a differential collection.
418    /// - If the collection closed.
419    pub(super) fn differential_write(&self, id: GlobalId, op: StorageWriteOp) {
420        if op.is_empty_append() {
421            return;
422        }
423
424        let collections = self.differential_collections.lock().expect("poisoned");
425        match collections.get(&id) {
426            Some((update_tx, _, _)) => {
427                let (tx, _rx) = oneshot::channel();
428                update_tx.send((op, tx)).expect("rx hung up");
429            }
430            None => panic!("missing differential collection: {id}"),
431        }
432    }
433
434    /// Appends the given `updates` to the differential collection identified by `id`.
435    ///
436    /// # Panics
437    /// - If `id` does not belong to a differential collection.
438    /// - If the collection closed.
439    pub(super) fn differential_append(&self, id: GlobalId, updates: Vec<(Row, Diff)>) {
440        self.differential_write(id, StorageWriteOp::Append { updates })
441    }
442
443    /// Returns a [`MonotonicAppender`] that can be used to monotonically append updates to the
444    /// collection correlated with `id`.
445    pub(super) fn monotonic_appender(
446        &self,
447        id: GlobalId,
448    ) -> Result<MonotonicAppender<T>, StorageError<T>> {
449        let guard = self
450            .append_only_collections
451            .lock()
452            .expect("CollectionManager panicked");
453        let tx = guard
454            .get(&id)
455            .map(|(tx, _, _)| tx.clone())
456            .ok_or(StorageError::IdentifierMissing(id))?;
457
458        Ok(MonotonicAppender::new(tx))
459    }
460
461    fn get_read_only(&self, id: GlobalId, force_writable: bool) -> bool {
462        if force_writable {
463            assert!(id.is_system(), "unexpected non-system global id: {id:?}");
464            false
465        } else {
466            self.read_only
467        }
468    }
469}
470
471pub(crate) struct DifferentialIntrospectionConfig<T>
472where
473    T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
474{
475    pub(crate) recent_upper: Antichain<T>,
476    pub(crate) introspection_type: IntrospectionType,
477    pub(crate) storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
478    pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
479    pub(crate) source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
480    pub(crate) sink_statistics:
481        Arc<Mutex<BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSinkStatistics>>>,
482    pub(crate) statistics_interval: Duration,
483    pub(crate) statistics_interval_receiver: watch::Receiver<Duration>,
484    pub(crate) statistics_retention_duration: Duration,
485    pub(crate) metrics: StorageControllerMetrics,
486    pub(crate) introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
487}
488
489/// A task that will make it so that the state in persist matches the desired
490/// state and continuously bump the upper for the specified collection.
491///
492/// NOTE: This implementation is a bit clunky, and could be optimized by not keeping
493/// all of desired in memory (see commend below). It is meant to showcase the
494/// general approach.
495struct DifferentialWriteTask<T, R>
496where
497    T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
498    R: FnMut() -> Pin<Box<dyn Future<Output = ReadHandle<SourceData, (), T, StorageDiff>> + Send>>
499        + Send
500        + 'static,
501{
502    /// The collection that we are writing to.
503    id: GlobalId,
504
505    write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
506
507    /// For getting a [`ReadHandle`] to sync our state to persist contents.
508    read_handle_fn: R,
509
510    read_only: bool,
511
512    now: NowFn,
513
514    /// In the absence of updates, we regularly bump the upper to "now", on this
515    /// interval. This makes it so the collection remains readable at recent
516    /// timestamps.
517    upper_tick_interval: tokio::time::Interval,
518
519    /// Receiver for write commands. These change our desired state.
520    cmd_rx: mpsc::UnboundedReceiver<(StorageWriteOp, oneshot::Sender<Result<(), StorageError<T>>>)>,
521
522    /// We have to shut down when receiving from this.
523    shutdown_rx: oneshot::Receiver<()>,
524
525    /// The contents of the collection as it should be according to whoever is
526    /// driving us around.
527    // This is memory inefficient: we always keep a full copy of
528    // desired, so that we can re-derive a to_write if/when someone else
529    // writes to persist and we notice because of an upper conflict.
530    // This is optimized for the case where we rarely have more than one
531    // writer.
532    //
533    // We can optimize for a multi-writer case by keeping an open
534    // ReadHandle and continually reading updates from persist, updating
535    // a desired in place. Similar to the self-correcting persist_sink.
536    desired: Vec<(Row, Diff)>,
537
538    /// Updates that we have to write when next writing to persist. This is
539    /// determined by looking at what is desired and what is in persist.
540    to_write: Vec<(Row, Diff)>,
541
542    /// Current upper of the persist shard. We keep track of this so that we
543    /// realize when someone else writes to the shard, in which case we have to
544    /// update our state of the world, that is update our `to_write` based on
545    /// `desired` and the contents of the persist shard.
546    current_upper: T,
547}
548
549impl<T, R> DifferentialWriteTask<T, R>
550where
551    T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
552    R: FnMut() -> Pin<Box<dyn Future<Output = ReadHandle<SourceData, (), T, StorageDiff>> + Send>>
553        + Send
554        + Sync
555        + 'static,
556{
557    /// Spawns a [`DifferentialWriteTask`] in an [`mz_ore::task`] and returns
558    /// handles for interacting with it.
559    fn spawn(
560        id: GlobalId,
561        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
562        read_handle_fn: R,
563        read_only: bool,
564        now: NowFn,
565        introspection_config: DifferentialIntrospectionConfig<T>,
566    ) -> (DifferentialWriteChannel<T>, WriteTask, ShutdownSender) {
567        let (tx, rx) = mpsc::unbounded_channel();
568        let (shutdown_tx, shutdown_rx) = oneshot::channel();
569
570        let upper_tick_interval = tokio::time::interval(Duration::from_millis(DEFAULT_TICK_MS));
571
572        let current_upper = T::minimum();
573
574        let task = Self {
575            id,
576            write_handle,
577            read_handle_fn,
578            read_only,
579            now,
580            upper_tick_interval,
581            cmd_rx: rx,
582            shutdown_rx,
583            desired: Vec::new(),
584            to_write: Vec::new(),
585            current_upper,
586        };
587
588        let handle = mz_ore::task::spawn(
589            || format!("CollectionManager-differential_write_task-{id}"),
590            async move {
591                if !task.read_only {
592                    task.prepare(introspection_config).await;
593                }
594                let res = task.run().await;
595
596                match res {
597                    ControlFlow::Break(reason) => {
598                        info!("write_task-{} ending: {}", id, reason);
599                    }
600                    c => {
601                        unreachable!(
602                            "cannot break out of the loop with a Continue, but got: {:?}",
603                            c
604                        );
605                    }
606                }
607            },
608        );
609
610        (tx, handle.abort_on_drop(), shutdown_tx)
611    }
612
613    /// Does any work that is required before this background task starts
614    /// writing to the given introspection collection.
615    ///
616    /// This might include consolidation, deleting older entries or seeding
617    /// in-memory state of, say, scrapers, with current collection contents.
618    async fn prepare(&self, introspection_config: DifferentialIntrospectionConfig<T>) {
619        tracing::info!(%self.id, ?introspection_config.introspection_type, "preparing differential introspection collection for writes");
620
621        match introspection_config.introspection_type {
622            IntrospectionType::ShardMapping => {
623                // Done by the `append_shard_mappings` call.
624            }
625            IntrospectionType::Frontiers | IntrospectionType::ReplicaFrontiers => {
626                // Differential collections start with an empty
627                // desired state. No need to manually reset.
628            }
629            IntrospectionType::StorageSourceStatistics => {
630                let prev = snapshot_statistics(
631                    self.id,
632                    introspection_config.recent_upper,
633                    &introspection_config.storage_collections,
634                )
635                .await;
636
637                let scraper_token = statistics::spawn_statistics_scraper(
638                    self.id.clone(),
639                    // These do a shallow copy.
640                    introspection_config.collection_manager,
641                    Arc::clone(&introspection_config.source_statistics),
642                    prev,
643                    introspection_config.statistics_interval.clone(),
644                    introspection_config.statistics_interval_receiver.clone(),
645                    introspection_config.statistics_retention_duration,
646                    introspection_config.metrics,
647                );
648                let web_token = statistics::spawn_webhook_statistics_scraper(
649                    introspection_config.source_statistics,
650                    introspection_config.statistics_interval,
651                    introspection_config.statistics_interval_receiver,
652                );
653
654                // Make sure these are dropped when the controller is
655                // dropped, so that the internal task will stop.
656                introspection_config
657                    .introspection_tokens
658                    .lock()
659                    .expect("poisoned")
660                    .insert(self.id, Box::new((scraper_token, web_token)));
661            }
662            IntrospectionType::StorageSinkStatistics => {
663                let prev = snapshot_statistics(
664                    self.id,
665                    introspection_config.recent_upper,
666                    &introspection_config.storage_collections,
667                )
668                .await;
669
670                let scraper_token = statistics::spawn_statistics_scraper(
671                    self.id.clone(),
672                    introspection_config.collection_manager,
673                    Arc::clone(&introspection_config.sink_statistics),
674                    prev,
675                    introspection_config.statistics_interval,
676                    introspection_config.statistics_interval_receiver,
677                    introspection_config.statistics_retention_duration,
678                    introspection_config.metrics,
679                );
680
681                // Make sure this is dropped when the controller is
682                // dropped, so that the internal task will stop.
683                introspection_config
684                    .introspection_tokens
685                    .lock()
686                    .expect("poisoned")
687                    .insert(self.id, scraper_token);
688            }
689
690            IntrospectionType::ComputeDependencies
691            | IntrospectionType::ComputeOperatorHydrationStatus
692            | IntrospectionType::ComputeMaterializedViewRefreshes
693            | IntrospectionType::ComputeErrorCounts
694            | IntrospectionType::ComputeHydrationTimes => {
695                // Differential collections start with an empty
696                // desired state. No need to manually reset.
697            }
698
699            introspection_type @ IntrospectionType::ReplicaMetricsHistory
700            | introspection_type @ IntrospectionType::WallclockLagHistory
701            | introspection_type @ IntrospectionType::WallclockLagHistogram
702            | introspection_type @ IntrospectionType::PreparedStatementHistory
703            | introspection_type @ IntrospectionType::StatementExecutionHistory
704            | introspection_type @ IntrospectionType::SessionHistory
705            | introspection_type @ IntrospectionType::StatementLifecycleHistory
706            | introspection_type @ IntrospectionType::SqlText
707            | introspection_type @ IntrospectionType::SourceStatusHistory
708            | introspection_type @ IntrospectionType::SinkStatusHistory
709            | introspection_type @ IntrospectionType::PrivatelinkConnectionStatusHistory
710            | introspection_type @ IntrospectionType::ReplicaStatusHistory => {
711                unreachable!("not differential collection: {introspection_type:?}")
712            }
713        }
714    }
715
716    async fn run(mut self) -> ControlFlow<String> {
717        let mut updates = Vec::new();
718        loop {
719            tokio::select! {
720                // Prefer sending actual updates over just bumping the upper,
721                // because sending updates also bump the upper.
722                biased;
723
724                // Listen for a shutdown signal so we can gracefully cleanup.
725                _ = &mut self.shutdown_rx => {
726                    self.handle_shutdown();
727
728                    return ControlFlow::Break("graceful shutdown".to_string());
729                }
730
731                // Pull a chunk of queued updates off the channel.
732                () = recv_all_commands(&mut self.cmd_rx, &mut updates) => {
733                    if updates.is_empty() {
734                        // Sender has been dropped, which means the collection
735                        // should have been unregistered, break out of the run
736                        // loop if we weren't already aborted.
737                        return ControlFlow::Break("sender has been dropped".to_string());
738                    }
739                    let _ = self.handle_updates(&mut updates).await?;
740                }
741
742                // If we haven't received any updates, then we'll move the upper forward.
743                _ = self.upper_tick_interval.tick() => {
744                    if self.read_only {
745                        // Not bumping uppers while in read-only mode.
746                        continue;
747                    }
748                    let _ = self.tick_upper().await?;
749                },
750            }
751        }
752    }
753
754    async fn tick_upper(&mut self) -> ControlFlow<String> {
755        let now = T::from((self.now)());
756
757        if now <= self.current_upper {
758            // Upper is already further along than current wall-clock time, no
759            // need to bump it.
760            return ControlFlow::Continue(());
761        }
762
763        assert!(!self.read_only);
764        let res = self
765            .write_handle
766            .compare_and_append_batch(
767                &mut [],
768                Antichain::from_elem(self.current_upper.clone()),
769                Antichain::from_elem(now.clone()),
770                true,
771            )
772            .await
773            .expect("valid usage");
774        match res {
775            // All good!
776            Ok(()) => {
777                tracing::debug!(%self.id, "bumped upper of differential collection");
778                self.current_upper = now;
779            }
780            Err(err) => {
781                // Someone else wrote to the collection or bumped the upper. We
782                // need to sync to latest persist state and potentially patch up
783                // our `to_write`, based on what we learn and `desired`.
784
785                let actual_upper = if let Some(ts) = err.current.as_option() {
786                    ts.clone()
787                } else {
788                    return ControlFlow::Break("upper is the empty antichain".to_string());
789                };
790
791                tracing::info!(%self.id, ?actual_upper, expected_upper = ?self.current_upper, "upper mismatch while bumping upper, syncing to persist state");
792
793                self.current_upper = actual_upper;
794
795                self.sync_to_persist().await;
796            }
797        }
798
799        ControlFlow::Continue(())
800    }
801
802    fn handle_shutdown(&mut self) {
803        let mut senders = Vec::new();
804
805        // Prevent new messages from being sent.
806        self.cmd_rx.close();
807
808        // Get as many waiting senders as possible.
809        while let Ok((_batch, sender)) = self.cmd_rx.try_recv() {
810            senders.push(sender);
811        }
812
813        // Notify them that this collection is closed.
814        //
815        // Note: if a task is shutting down, that indicates the source has been
816        // dropped, at which point the identifier is invalid. Returning this
817        // error provides a better user experience.
818        notify_listeners(senders, || Err(StorageError::IdentifierInvalid(self.id)));
819    }
820
821    async fn handle_updates(
822        &mut self,
823        batch: &mut Vec<(StorageWriteOp, oneshot::Sender<Result<(), StorageError<T>>>)>,
824    ) -> ControlFlow<String> {
825        // Put in place _some_ rate limiting.
826        let batch_duration_ms = STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT;
827
828        let use_batch_now = Instant::now();
829        let min_time_to_complete = use_batch_now + batch_duration_ms;
830
831        tracing::debug!(
832            ?use_batch_now,
833            ?batch_duration_ms,
834            ?min_time_to_complete,
835            "batch duration",
836        );
837
838        let mut responders = Vec::with_capacity(batch.len());
839        for (op, tx) in batch.drain(..) {
840            self.apply_write_op(op);
841            responders.push(tx);
842        }
843
844        // TODO: Maybe don't do it every time?
845        consolidation::consolidate(&mut self.desired);
846        consolidation::consolidate(&mut self.to_write);
847
848        // Reset the interval which is used to periodically bump the uppers
849        // because the uppers will get bumped with the following update.
850        // This makes it such that we will write at most once every
851        // `interval`.
852        //
853        // For example, let's say our `DEFAULT_TICK` interval is 10, so at
854        // `t + 10`, `t + 20`, ... we'll bump the uppers. If we receive an
855        // update at `t + 3` we want to shift this window so we bump the
856        // uppers at `t + 13`, `t + 23`, ... which resetting the interval
857        // accomplishes.
858        self.upper_tick_interval.reset();
859
860        self.write_to_persist(responders).await?;
861
862        // Wait until our artificial latency has completed.
863        //
864        // Note: if writing to persist took longer than `DEFAULT_TICK` this
865        // await will resolve immediately.
866        tokio::time::sleep_until(min_time_to_complete).await;
867
868        ControlFlow::Continue(())
869    }
870
871    /// Apply the given write operation to the `desired`/`to_write` state.
872    fn apply_write_op(&mut self, op: StorageWriteOp) {
873        match op {
874            StorageWriteOp::Append { updates } => {
875                self.desired.extend_from_slice(&updates);
876                self.to_write.extend(updates);
877            }
878            StorageWriteOp::Delete { filter } => {
879                let to_delete = self.desired.extract_if(.., |(row, _)| filter(row));
880                let retractions = to_delete.map(|(row, diff)| (row, -diff));
881                self.to_write.extend(retractions);
882            }
883        }
884    }
885
886    /// Attempt to write what is currently in [Self::to_write] to persist,
887    /// retrying and re-syncing to persist when necessary, that is when the
888    /// upper was not what we expected.
889    async fn write_to_persist(
890        &mut self,
891        responders: Vec<oneshot::Sender<Result<(), StorageError<T>>>>,
892    ) -> ControlFlow<String> {
893        if self.read_only {
894            tracing::debug!(%self.id, "not writing to differential collection: read-only");
895            // Not attempting to write while in read-only mode.
896            return ControlFlow::Continue(());
897        }
898
899        // We'll try really hard to succeed, but eventually stop.
900        //
901        // Note: it's very rare we should ever need to retry, and if we need to
902        // retry it should only take 1 or 2 attempts. We set `max_tries` to be
903        // high though because if we hit some edge case we want to try hard to
904        // commit the data.
905        let retries = Retry::default()
906            .initial_backoff(Duration::from_secs(1))
907            .clamp_backoff(Duration::from_secs(3))
908            .factor(1.25)
909            .max_tries(20)
910            .into_retry_stream();
911        let mut retries = Box::pin(retries);
912
913        loop {
914            // Append updates to persist!
915            let now = T::from((self.now)());
916            let new_upper = std::cmp::max(
917                now,
918                TimestampManipulation::step_forward(&self.current_upper),
919            );
920
921            let updates_to_write = self
922                .to_write
923                .iter()
924                .map(|(row, diff)| {
925                    (
926                        (SourceData(Ok(row.clone())), ()),
927                        self.current_upper.clone(),
928                        diff.into_inner(),
929                    )
930                })
931                .collect::<Vec<_>>();
932
933            assert!(!self.read_only);
934            let res = self
935                .write_handle
936                .compare_and_append(
937                    updates_to_write,
938                    Antichain::from_elem(self.current_upper.clone()),
939                    Antichain::from_elem(new_upper.clone()),
940                )
941                .await
942                .expect("valid usage");
943            match res {
944                // Everything was successful!
945                Ok(()) => {
946                    // Notify all of our listeners.
947                    notify_listeners(responders, || Ok(()));
948
949                    self.current_upper = new_upper;
950
951                    // Very important! This is empty at steady state, while
952                    // desired keeps an in-memory copy of desired state.
953                    self.to_write.clear();
954
955                    tracing::debug!(%self.id, "appended to differential collection");
956
957                    // Break out of the retry loop so we can wait for more data.
958                    break;
959                }
960                // Failed to write to some collections,
961                Err(err) => {
962                    // Someone else wrote to the collection. We need to read
963                    // from persist and update to_write based on that and the
964                    // desired state.
965                    let actual_upper = if let Some(ts) = err.current.as_option() {
966                        ts.clone()
967                    } else {
968                        return ControlFlow::Break("upper is the empty antichain".to_string());
969                    };
970
971                    tracing::info!(%self.id, ?actual_upper, expected_upper = ?self.current_upper, "retrying append for differential collection");
972
973                    // We've exhausted all of our retries, notify listeners and
974                    // break out of the retry loop so we can wait for more data.
975                    if retries.next().await.is_none() {
976                        let invalid_upper = InvalidUpper {
977                            id: self.id,
978                            current_upper: err.current,
979                        };
980                        notify_listeners(responders, || {
981                            Err(StorageError::InvalidUppers(vec![invalid_upper.clone()]))
982                        });
983                        error!(
984                            "exhausted retries when appending to managed collection {}",
985                            self.id
986                        );
987                        break;
988                    }
989
990                    self.current_upper = actual_upper;
991
992                    self.sync_to_persist().await;
993
994                    debug!(
995                        "Retrying invalid-uppers error while appending to differential collection {}",
996                        self.id
997                    );
998                }
999            }
1000        }
1001
1002        ControlFlow::Continue(())
1003    }
1004
1005    /// Re-derives [Self::to_write] by looking at [Self::desired] and the
1006    /// current state in persist. We want to insert everything in desired and
1007    /// retract everything in persist. But ideally most of that cancels out in
1008    /// consolidation.
1009    ///
1010    /// To be called when a `compare_and_append` failed because the upper didn't
1011    /// match what we expected.
1012    async fn sync_to_persist(&mut self) {
1013        let mut read_handle = (self.read_handle_fn)().await;
1014        let as_of = self
1015            .current_upper
1016            .step_back()
1017            .unwrap_or_else(|| T::minimum());
1018        let as_of = Antichain::from_elem(as_of);
1019        let snapshot = read_handle.snapshot_and_fetch(as_of).await;
1020
1021        let mut negated_oks = match snapshot {
1022            Ok(contents) => {
1023                let mut snapshot = Vec::with_capacity(contents.len());
1024                for ((data, _), _, diff) in contents {
1025                    let row = data.0.unwrap();
1026                    snapshot.push((row, -Diff::from(diff)));
1027                }
1028                snapshot
1029            }
1030            Err(_) => panic!("read before since"),
1031        };
1032
1033        self.to_write.clear();
1034        self.to_write.extend(self.desired.iter().cloned());
1035        self.to_write.append(&mut negated_oks);
1036        consolidation::consolidate(&mut self.to_write);
1037    }
1038}
1039
1040pub(crate) struct AppendOnlyIntrospectionConfig<T>
1041where
1042    T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
1043{
1044    pub(crate) introspection_type: IntrospectionType,
1045    pub(crate) config_set: Arc<ConfigSet>,
1046    pub(crate) parameters: StorageParameters,
1047    pub(crate) storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
1048}
1049
1050/// A task that writes to an append only collection and continuously bumps the upper for the specified
1051/// collection.
1052///
1053/// For status history collections, this task can deduplicate redundant [`Statuses`](Status).
1054struct AppendOnlyWriteTask<T>
1055where
1056    T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
1057{
1058    /// The collection that we are writing to.
1059    id: GlobalId,
1060    write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
1061    read_only: bool,
1062    now: NowFn,
1063    user_batch_duration_ms: Arc<AtomicU64>,
1064    /// Receiver for write commands.
1065    rx: mpsc::UnboundedReceiver<(
1066        Vec<AppendOnlyUpdate>,
1067        oneshot::Sender<Result<(), StorageError<T>>>,
1068    )>,
1069
1070    /// We have to shut down when receiving from this.
1071    shutdown_rx: oneshot::Receiver<()>,
1072    /// If this collection deduplicates statuses, this map is used to track the previous status.
1073    previous_statuses: Option<BTreeMap<(GlobalId, Option<ReplicaId>), Status>>,
1074}
1075
1076impl<T> AppendOnlyWriteTask<T>
1077where
1078    T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
1079{
1080    /// Spawns an [`AppendOnlyWriteTask`] in an [`mz_ore::task`] that will continuously bump the
1081    /// upper for the specified collection,
1082    /// and append data that is sent via the provided [`mpsc::UnboundedSender`].
1083    ///
1084    /// TODO(parkmycar): One day if we want to customize the tick interval for each collection, that
1085    /// should be done here.
1086    /// TODO(parkmycar): Maybe add prometheus metrics for each collection?
1087    fn spawn(
1088        id: GlobalId,
1089        write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
1090        read_only: bool,
1091        now: NowFn,
1092        user_batch_duration_ms: Arc<AtomicU64>,
1093        introspection_config: Option<AppendOnlyIntrospectionConfig<T>>,
1094    ) -> (AppendOnlyWriteChannel<T>, WriteTask, ShutdownSender) {
1095        let (tx, rx) = mpsc::unbounded_channel();
1096        let (shutdown_tx, shutdown_rx) = oneshot::channel();
1097
1098        let previous_statuses: Option<BTreeMap<(GlobalId, Option<ReplicaId>), Status>> =
1099            match introspection_config
1100                .as_ref()
1101                .map(|config| config.introspection_type)
1102            {
1103                Some(IntrospectionType::SourceStatusHistory)
1104                | Some(IntrospectionType::SinkStatusHistory) => Some(BTreeMap::new()),
1105
1106                Some(IntrospectionType::ReplicaMetricsHistory)
1107                | Some(IntrospectionType::WallclockLagHistory)
1108                | Some(IntrospectionType::WallclockLagHistogram)
1109                | Some(IntrospectionType::PrivatelinkConnectionStatusHistory)
1110                | Some(IntrospectionType::ReplicaStatusHistory)
1111                | Some(IntrospectionType::PreparedStatementHistory)
1112                | Some(IntrospectionType::StatementExecutionHistory)
1113                | Some(IntrospectionType::SessionHistory)
1114                | Some(IntrospectionType::StatementLifecycleHistory)
1115                | Some(IntrospectionType::SqlText)
1116                | None => None,
1117
1118                Some(introspection_type @ IntrospectionType::ShardMapping)
1119                | Some(introspection_type @ IntrospectionType::Frontiers)
1120                | Some(introspection_type @ IntrospectionType::ReplicaFrontiers)
1121                | Some(introspection_type @ IntrospectionType::StorageSourceStatistics)
1122                | Some(introspection_type @ IntrospectionType::StorageSinkStatistics)
1123                | Some(introspection_type @ IntrospectionType::ComputeDependencies)
1124                | Some(introspection_type @ IntrospectionType::ComputeOperatorHydrationStatus)
1125                | Some(introspection_type @ IntrospectionType::ComputeMaterializedViewRefreshes)
1126                | Some(introspection_type @ IntrospectionType::ComputeErrorCounts)
1127                | Some(introspection_type @ IntrospectionType::ComputeHydrationTimes) => {
1128                    unreachable!("not append-only collection: {introspection_type:?}")
1129                }
1130            };
1131
1132        let mut task = Self {
1133            id,
1134            write_handle,
1135            rx,
1136            shutdown_rx,
1137            read_only,
1138            now,
1139            user_batch_duration_ms,
1140            previous_statuses,
1141        };
1142
1143        let handle = mz_ore::task::spawn(
1144            || format!("CollectionManager-append_only_write_task-{id}"),
1145            async move {
1146                if !task.read_only {
1147                    task.prepare(introspection_config).await;
1148                }
1149                task.run().await;
1150            },
1151        );
1152
1153        (tx, handle.abort_on_drop(), shutdown_tx)
1154    }
1155
1156    /// Does any work that is required before the background task starts
1157    /// writing to the given append only introspection collection.
1158    ///
1159    /// This might include consolidation or deleting older entries.
1160    async fn prepare(&mut self, introspection_config: Option<AppendOnlyIntrospectionConfig<T>>) {
1161        let Some(AppendOnlyIntrospectionConfig {
1162            introspection_type,
1163            config_set,
1164            parameters,
1165            storage_collections,
1166        }) = introspection_config
1167        else {
1168            return;
1169        };
1170        let initial_statuses = match introspection_type {
1171            IntrospectionType::ReplicaMetricsHistory
1172            | IntrospectionType::WallclockLagHistory
1173            | IntrospectionType::WallclockLagHistogram => {
1174                let result = partially_truncate_metrics_history(
1175                    self.id,
1176                    introspection_type,
1177                    &mut self.write_handle,
1178                    config_set,
1179                    self.now.clone(),
1180                    storage_collections,
1181                )
1182                .await;
1183                if let Err(error) = result {
1184                    soft_panic_or_log!(
1185                        "error truncating metrics history: {error} (type={introspection_type:?})"
1186                    );
1187                }
1188                Vec::new()
1189            }
1190
1191            IntrospectionType::PrivatelinkConnectionStatusHistory => {
1192                partially_truncate_status_history(
1193                    self.id,
1194                    IntrospectionType::PrivatelinkConnectionStatusHistory,
1195                    &mut self.write_handle,
1196                    privatelink_status_history_desc(&parameters),
1197                    self.now.clone(),
1198                    &storage_collections,
1199                )
1200                .await;
1201                Vec::new()
1202            }
1203            IntrospectionType::ReplicaStatusHistory => {
1204                partially_truncate_status_history(
1205                    self.id,
1206                    IntrospectionType::ReplicaStatusHistory,
1207                    &mut self.write_handle,
1208                    replica_status_history_desc(&parameters),
1209                    self.now.clone(),
1210                    &storage_collections,
1211                )
1212                .await;
1213                Vec::new()
1214            }
1215
1216            // Note [btv] - we don't truncate these, because that uses
1217            // a huge amount of memory on environmentd startup.
1218            IntrospectionType::PreparedStatementHistory
1219            | IntrospectionType::StatementExecutionHistory
1220            | IntrospectionType::SessionHistory
1221            | IntrospectionType::StatementLifecycleHistory
1222            | IntrospectionType::SqlText => {
1223                // NOTE(aljoscha): We never remove from these
1224                // collections. Someone, at some point needs to
1225                // think about that! Issue:
1226                // https://github.com/MaterializeInc/database-issues/issues/7666
1227                Vec::new()
1228            }
1229
1230            IntrospectionType::SourceStatusHistory => {
1231                let last_status_per_id = partially_truncate_status_history(
1232                    self.id,
1233                    IntrospectionType::SourceStatusHistory,
1234                    &mut self.write_handle,
1235                    source_status_history_desc(&parameters),
1236                    self.now.clone(),
1237                    &storage_collections,
1238                )
1239                .await;
1240
1241                let status_col = MZ_SOURCE_STATUS_HISTORY_DESC
1242                    .get_by_name(&ColumnName::from("status"))
1243                    .expect("schema has not changed")
1244                    .0;
1245
1246                last_status_per_id
1247                    .into_iter()
1248                    .map(|(id, row)| {
1249                        (
1250                            id,
1251                            Status::from_str(
1252                                row.iter()
1253                                    .nth(status_col)
1254                                    .expect("schema has not changed")
1255                                    .unwrap_str(),
1256                            )
1257                            .expect("statuses must be uncorrupted"),
1258                        )
1259                    })
1260                    .collect()
1261            }
1262            IntrospectionType::SinkStatusHistory => {
1263                let last_status_per_id = partially_truncate_status_history(
1264                    self.id,
1265                    IntrospectionType::SinkStatusHistory,
1266                    &mut self.write_handle,
1267                    sink_status_history_desc(&parameters),
1268                    self.now.clone(),
1269                    &storage_collections,
1270                )
1271                .await;
1272
1273                let status_col = MZ_SINK_STATUS_HISTORY_DESC
1274                    .get_by_name(&ColumnName::from("status"))
1275                    .expect("schema has not changed")
1276                    .0;
1277
1278                last_status_per_id
1279                    .into_iter()
1280                    .map(|(id, row)| {
1281                        (
1282                            id,
1283                            Status::from_str(
1284                                row.iter()
1285                                    .nth(status_col)
1286                                    .expect("schema has not changed")
1287                                    .unwrap_str(),
1288                            )
1289                            .expect("statuses must be uncorrupted"),
1290                        )
1291                    })
1292                    .collect()
1293            }
1294
1295            introspection_type @ IntrospectionType::ShardMapping
1296            | introspection_type @ IntrospectionType::Frontiers
1297            | introspection_type @ IntrospectionType::ReplicaFrontiers
1298            | introspection_type @ IntrospectionType::StorageSourceStatistics
1299            | introspection_type @ IntrospectionType::StorageSinkStatistics
1300            | introspection_type @ IntrospectionType::ComputeDependencies
1301            | introspection_type @ IntrospectionType::ComputeOperatorHydrationStatus
1302            | introspection_type @ IntrospectionType::ComputeMaterializedViewRefreshes
1303            | introspection_type @ IntrospectionType::ComputeErrorCounts
1304            | introspection_type @ IntrospectionType::ComputeHydrationTimes => {
1305                unreachable!("not append-only collection: {introspection_type:?}")
1306            }
1307        };
1308        if let Some(previous_statuses) = &mut self.previous_statuses {
1309            previous_statuses.extend(initial_statuses);
1310        }
1311    }
1312
1313    async fn run(mut self) {
1314        let mut interval = tokio::time::interval(Duration::from_millis(DEFAULT_TICK_MS));
1315
1316        let mut batch: Vec<(Vec<_>, _)> = Vec::new();
1317
1318        'run: loop {
1319            tokio::select! {
1320                // Prefer sending actual updates over just bumping the upper, because sending
1321                // updates also bump the upper.
1322                biased;
1323
1324                // Listen for a shutdown signal so we can gracefully cleanup.
1325                _ = &mut self.shutdown_rx => {
1326                    let mut senders = Vec::new();
1327
1328                    // Prevent new messages from being sent.
1329                    self.rx.close();
1330
1331                    // Get as many waiting senders as possible.
1332                    while let Ok((_batch, sender)) = self.rx.try_recv() {
1333                        senders.push(sender);
1334                    }
1335
1336                    // Notify them that this collection is closed.
1337                    //
1338                    // Note: if a task is shutting down, that indicates the source has been
1339                    // dropped, at which point the identifier is invalid. Returning this
1340                    // error provides a better user experience.
1341                    notify_listeners(senders, || Err(StorageError::IdentifierInvalid(self.id)));
1342
1343                    break 'run;
1344                }
1345
1346                // Pull a chunk of queued updates off the channel.
1347                () = recv_all_commands(&mut self.rx, &mut batch) => {
1348                    if batch.is_empty() {
1349                        // Sender has been dropped, which means the collection should have been
1350                        // unregistered, break out of the run loop if we weren't already
1351                        // aborted.
1352                        break 'run;
1353                    }
1354
1355                    // To rate limit appends to persist we add artificial latency, and will
1356                    // finish no sooner than this instant.
1357                    let batch_duration_ms = match self.id {
1358                        GlobalId::User(_) => Duration::from_millis(
1359                            self.user_batch_duration_ms.load(Ordering::Relaxed),
1360                        ),
1361                        // For non-user collections, always just use the default.
1362                        _ => STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT,
1363                    };
1364                    let use_batch_now = Instant::now();
1365                    let min_time_to_complete = use_batch_now + batch_duration_ms;
1366
1367                    tracing::debug!(
1368                        ?use_batch_now,
1369                        ?batch_duration_ms,
1370                        ?min_time_to_complete,
1371                        "batch duration",
1372                    );
1373
1374                    // Reset the interval which is used to periodically bump the uppers
1375                    // because the uppers will get bumped with the following update. This
1376                    // makes it such that we will write at most once every `interval`.
1377                    //
1378                    // For example, let's say our `DEFAULT_TICK` interval is 10, so at
1379                    // `t + 10`, `t + 20`, ... we'll bump the uppers. If we receive an
1380                    // update at `t + 3` we want to shift this window so we bump the uppers
1381                    // at `t + 13`, `t + 23`, ... which resetting the interval accomplishes.
1382                    interval.reset();
1383
1384                    let capacity: usize = batch
1385                        .iter()
1386                        .map(|(rows, _)| rows.len())
1387                        .sum();
1388                    let mut all_rows = Vec::with_capacity(capacity);
1389                    let mut responders = Vec::with_capacity(batch.len());
1390
1391                    for (updates, responder) in batch.drain(..) {
1392                        let rows = self.process_updates(updates);
1393
1394                        all_rows.extend(
1395                            rows.map(|(row, diff)| TimestamplessUpdate { row, diff }),
1396                        );
1397                        responders.push(responder);
1398                    }
1399
1400                    if self.read_only {
1401                        tracing::warn!(%self.id, ?all_rows, "append while in read-only mode");
1402                        notify_listeners(responders, || Err(StorageError::ReadOnly));
1403                        continue;
1404                    }
1405
1406                    // Append updates to persist!
1407                    let at_least = T::from((self.now)());
1408
1409                    if !all_rows.is_empty() {
1410                        monotonic_append(&mut self.write_handle, all_rows, at_least).await;
1411                    }
1412                    // Notify all of our listeners.
1413                    notify_listeners(responders, || Ok(()));
1414
1415                    // Wait until our artificial latency has completed.
1416                    //
1417                    // Note: if writing to persist took longer than `DEFAULT_TICK` this
1418                    // await will resolve immediately.
1419                    tokio::time::sleep_until(min_time_to_complete).await;
1420                }
1421
1422                // If we haven't received any updates, then we'll move the upper forward.
1423                _ = interval.tick() => {
1424                    if self.read_only {
1425                        // Not bumping uppers while in read-only mode.
1426                        continue;
1427                    }
1428
1429                    // Update our collection.
1430                    let now = T::from((self.now)());
1431                    let updates = vec![];
1432                    let at_least = now.clone();
1433
1434                    // Failures don't matter when advancing collections' uppers. This might
1435                    // fail when a clusterd happens to be writing to this concurrently.
1436                    // Advancing uppers here is best-effort and only needs to succeed if no
1437                    // one else is advancing it; contention proves otherwise.
1438                    monotonic_append(&mut self.write_handle, updates, at_least).await;
1439                },
1440            }
1441        }
1442
1443        info!("write_task-{} ending", self.id);
1444    }
1445
1446    /// Deduplicate any [`mz_storage_client::client::StatusUpdate`] within `updates` and converts
1447    /// `updates` to rows and diffs.
1448    fn process_updates(
1449        &mut self,
1450        updates: Vec<AppendOnlyUpdate>,
1451    ) -> impl Iterator<Item = (Row, Diff)> {
1452        let updates = if let Some(previous_statuses) = &mut self.previous_statuses {
1453            let new: Vec<_> = updates
1454                .into_iter()
1455                .filter(|r| match r {
1456                    AppendOnlyUpdate::Row(_) => true,
1457                    AppendOnlyUpdate::Status(update) => {
1458                        match (
1459                            previous_statuses
1460                                .get(&(update.id, update.replica_id))
1461                                .as_deref(),
1462                            &update.status,
1463                        ) {
1464                            (None, _) => true,
1465                            (Some(old), new) => old.superseded_by(*new),
1466                        }
1467                    }
1468                })
1469                .collect();
1470            previous_statuses.extend(new.iter().filter_map(|update| match update {
1471                AppendOnlyUpdate::Row(_) => None,
1472                AppendOnlyUpdate::Status(update) => {
1473                    Some(((update.id, update.replica_id), update.status))
1474                }
1475            }));
1476            new
1477        } else {
1478            updates
1479        };
1480
1481        updates.into_iter().map(AppendOnlyUpdate::into_row)
1482    }
1483}
1484
1485/// Truncates the given metrics history by removing all entries older than that history's
1486/// configured retention interval.
1487///
1488/// # Panics
1489///
1490/// Panics if `collection` is not a metrics history.
1491async fn partially_truncate_metrics_history<T>(
1492    id: GlobalId,
1493    introspection_type: IntrospectionType,
1494    write_handle: &mut WriteHandle<SourceData, (), T, StorageDiff>,
1495    config_set: Arc<ConfigSet>,
1496    now: NowFn,
1497    storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
1498) -> Result<(), anyhow::Error>
1499where
1500    T: Codec64 + From<EpochMillis> + TimestampManipulation,
1501{
1502    let (keep_duration, occurred_at_col) = match introspection_type {
1503        IntrospectionType::ReplicaMetricsHistory => (
1504            REPLICA_METRICS_HISTORY_RETENTION_INTERVAL.get(&config_set),
1505            REPLICA_METRICS_HISTORY_DESC
1506                .get_by_name(&ColumnName::from("occurred_at"))
1507                .expect("schema has not changed")
1508                .0,
1509        ),
1510        IntrospectionType::WallclockLagHistory => (
1511            WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL.get(&config_set),
1512            WALLCLOCK_LAG_HISTORY_DESC
1513                .get_by_name(&ColumnName::from("occurred_at"))
1514                .expect("schema has not changed")
1515                .0,
1516        ),
1517        IntrospectionType::WallclockLagHistogram => (
1518            WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL.get(&config_set),
1519            WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC
1520                .get_by_name(&ColumnName::from("period_start"))
1521                .expect("schema has not changed")
1522                .0,
1523        ),
1524        _ => panic!("not a metrics history: {introspection_type:?}"),
1525    };
1526
1527    let upper = write_handle.fetch_recent_upper().await;
1528    let Some(upper_ts) = upper.as_option() else {
1529        bail!("collection is sealed");
1530    };
1531    let Some(as_of_ts) = upper_ts.step_back() else {
1532        return Ok(()); // nothing to truncate
1533    };
1534
1535    let mut rows = storage_collections
1536        .snapshot_cursor(id, as_of_ts)
1537        .await
1538        .map_err(|e| anyhow!("reading snapshot: {e:?}"))?;
1539
1540    let now = mz_ore::now::to_datetime(now());
1541    let keep_since = now - keep_duration;
1542
1543    // It is very important that we append our retractions at the timestamp
1544    // right after the timestamp at which we got our snapshot. Otherwise,
1545    // it's possible for someone else to sneak in retractions or other
1546    // unexpected changes.
1547    let old_upper_ts = upper_ts.clone();
1548    let new_upper_ts = TimestampManipulation::step_forward(&old_upper_ts);
1549
1550    // Produce retractions by inverting diffs of rows we want to delete.
1551    let mut builder = write_handle.builder(Antichain::from_elem(old_upper_ts.clone()));
1552    while let Some(chunk) = rows.next().await {
1553        for (data, _t, diff) in chunk {
1554            let Ok(row) = &data.0 else { continue };
1555            let datums = row.unpack();
1556            let occurred_at = datums[occurred_at_col].unwrap_timestamptz();
1557            if *occurred_at >= keep_since {
1558                continue;
1559            }
1560            let diff = -diff;
1561            match builder.add(&data, &(), &old_upper_ts, &diff).await? {
1562                Added::Record => {}
1563                Added::RecordAndParts => {
1564                    debug!(?id, "added part to builder");
1565                }
1566            }
1567        }
1568    }
1569
1570    let mut updates = builder
1571        .finish(Antichain::from_elem(new_upper_ts.clone()))
1572        .await?;
1573    let mut batches = vec![&mut updates];
1574
1575    write_handle
1576        .compare_and_append_batch(
1577            batches.as_mut_slice(),
1578            Antichain::from_elem(old_upper_ts),
1579            Antichain::from_elem(new_upper_ts),
1580            true,
1581        )
1582        .await
1583        .expect("valid usage")
1584        .map_err(|e| anyhow!("appending retractions: {e:?}"))
1585}
1586
1587/// Effectively truncates the status history shard based on its retention policy.
1588///
1589/// NOTE: The history collections are really append-only collections, but
1590/// every-now-and-then we want to retract old updates so that the collection
1591/// does not grow unboundedly. Crucially, these are _not_ incremental
1592/// collections, they are not derived from a state at some time `t` and we
1593/// cannot maintain a desired state for them.
1594///
1595/// Returns a map with latest unpacked row per key.
1596pub(crate) async fn partially_truncate_status_history<T, K>(
1597    id: GlobalId,
1598    introspection_type: IntrospectionType,
1599    write_handle: &mut WriteHandle<SourceData, (), T, StorageDiff>,
1600    status_history_desc: StatusHistoryDesc<K>,
1601    now: NowFn,
1602    storage_collections: &Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
1603) -> BTreeMap<K, Row>
1604where
1605    T: Codec64 + From<EpochMillis> + TimestampManipulation,
1606    K: Clone + Debug + Ord + Send + Sync,
1607{
1608    let upper = write_handle.fetch_recent_upper().await.clone();
1609
1610    let mut rows = match upper.as_option() {
1611        Some(f) if f > &T::minimum() => {
1612            let as_of = f.step_back().unwrap();
1613
1614            storage_collections
1615                .snapshot_cursor(id, as_of)
1616                .await
1617                .expect("snapshot succeeds")
1618        }
1619        // If collection is closed or the frontier is the minimum, we cannot
1620        // or don't need to truncate (respectively).
1621        _ => return BTreeMap::new(),
1622    };
1623
1624    // BTreeMap to keep track of the row with the latest timestamp for each key.
1625    let mut latest_row_per_key: BTreeMap<K, (CheckedTimestamp<DateTime<Utc>>, Row)> =
1626        BTreeMap::new();
1627
1628    // It is very important that we append our retractions at the timestamp
1629    // right after the timestamp at which we got our snapshot. Otherwise,
1630    // it's possible for someone else to sneak in retractions or other
1631    // unexpected changes.
1632    let expected_upper = upper.into_option().expect("checked above");
1633    let new_upper = TimestampManipulation::step_forward(&expected_upper);
1634
1635    let mut deletions = write_handle.builder(Antichain::from_elem(expected_upper.clone()));
1636
1637    let mut handle_row = {
1638        let latest_row_per_key = &mut latest_row_per_key;
1639        move |row: &Row, diff| {
1640            let datums = row.unpack();
1641            let key = (status_history_desc.extract_key)(&datums);
1642            let timestamp = (status_history_desc.extract_time)(&datums);
1643
1644            assert!(
1645                diff > 0,
1646                "only know how to operate over consolidated data with diffs > 0, \
1647                    found diff {diff} for object {key:?} in {introspection_type:?}",
1648            );
1649
1650            // Keep track of the timestamp of the latest row per key.
1651            match latest_row_per_key.get(&key) {
1652                Some(existing) if &existing.0 > &timestamp => {}
1653                _ => {
1654                    latest_row_per_key.insert(key.clone(), (timestamp, row.clone()));
1655                }
1656            };
1657            (key, timestamp)
1658        }
1659    };
1660
1661    match status_history_desc.retention_policy {
1662        StatusHistoryRetentionPolicy::LastN(n) => {
1663            // BTreeMap to track the earliest events for each key.
1664            let mut last_n_entries_per_key: BTreeMap<
1665                K,
1666                BinaryHeap<Reverse<(CheckedTimestamp<DateTime<Utc>>, Row)>>,
1667            > = BTreeMap::new();
1668
1669            while let Some(chunk) = rows.next().await {
1670                for (data, _t, diff) in chunk {
1671                    let Ok(row) = &data.0 else { continue };
1672                    let (key, timestamp) = handle_row(row, diff);
1673
1674                    // Duplicate rows ARE possible if many status changes happen in VERY quick succession,
1675                    // so we handle duplicated rows separately.
1676                    let entries = last_n_entries_per_key.entry(key).or_default();
1677                    for _ in 0..diff {
1678                        // We CAN have multiple statuses (most likely Starting and Running) at the exact same
1679                        // millisecond, depending on how the `health_operator` is scheduled.
1680                        //
1681                        // Note that these will be arbitrarily ordered, so a Starting event might
1682                        // survive and a Running one won't. The next restart will remove the other,
1683                        // so we don't bother being careful about it.
1684                        //
1685                        // TODO(guswynn): unpack these into health-status objects and use
1686                        // their `Ord` impl.
1687                        entries.push(Reverse((timestamp, row.clone())));
1688
1689                        // Retain some number of entries, using pop to mark the oldest entries for
1690                        // deletion.
1691                        while entries.len() > n {
1692                            if let Some(Reverse((_, r))) = entries.pop() {
1693                                deletions
1694                                    .add(&SourceData(Ok(r)), &(), &expected_upper, &-1)
1695                                    .await
1696                                    .expect("usage should be valid");
1697                            }
1698                        }
1699                    }
1700                }
1701            }
1702        }
1703        StatusHistoryRetentionPolicy::TimeWindow(time_window) => {
1704            // Get the lower bound of our retention window
1705            let now = mz_ore::now::to_datetime(now());
1706            let keep_since = now - time_window;
1707
1708            // Mark any row outside the retention window for deletion
1709            while let Some(chunk) = rows.next().await {
1710                for (data, _t, diff) in chunk {
1711                    let Ok(row) = &data.0 else { continue };
1712                    let (_, timestamp) = handle_row(row, diff);
1713
1714                    if *timestamp < keep_since {
1715                        deletions
1716                            .add(&data, &(), &expected_upper, &-1)
1717                            .await
1718                            .expect("usage should be valid");
1719                    }
1720                }
1721            }
1722        }
1723    }
1724
1725    let mut updates = deletions
1726        .finish(Antichain::from_elem(new_upper.clone()))
1727        .await
1728        .expect("expected valid usage");
1729    let mut batches = vec![&mut updates];
1730
1731    // Updates are only deletes because everything else is already in the shard.\
1732    let res = write_handle
1733        .compare_and_append_batch(
1734            batches.as_mut_slice(),
1735            Antichain::from_elem(expected_upper.clone()),
1736            Antichain::from_elem(new_upper),
1737            true,
1738        )
1739        .await
1740        .expect("usage was valid");
1741
1742    match res {
1743        Ok(_) => {
1744            // All good, yay!
1745        }
1746        Err(err) => {
1747            // This is fine, it just means the upper moved because
1748            // of continual upper advancement or because someone
1749            // already appended some more retractions/updates.
1750            //
1751            // NOTE: We might want to attempt these partial
1752            // retractions on an interval, instead of only when
1753            // starting up!
1754            info!(
1755                %id, ?expected_upper, current_upper = ?err.current,
1756                "failed to append partial truncation",
1757            );
1758        }
1759    }
1760
1761    latest_row_per_key
1762        .into_iter()
1763        .map(|(key, (_, row))| (key, row))
1764        .collect()
1765}
1766
1767async fn monotonic_append<T: Timestamp + Lattice + Codec64 + TimestampManipulation>(
1768    write_handle: &mut WriteHandle<SourceData, (), T, StorageDiff>,
1769    updates: Vec<TimestamplessUpdate>,
1770    at_least: T,
1771) {
1772    let mut expected_upper = write_handle.shared_upper();
1773    loop {
1774        if updates.is_empty() && expected_upper.is_empty() {
1775            // Ignore timestamp advancement for
1776            // closed collections. TODO? Make this a
1777            // correctable error
1778            return;
1779        }
1780
1781        let upper = expected_upper
1782            .into_option()
1783            .expect("cannot append data to closed collection");
1784
1785        let lower = if upper.less_than(&at_least) {
1786            at_least.clone()
1787        } else {
1788            upper.clone()
1789        };
1790
1791        let new_upper = TimestampManipulation::step_forward(&lower);
1792        let updates = updates
1793            .iter()
1794            .map(|TimestamplessUpdate { row, diff }| {
1795                (
1796                    (SourceData(Ok(row.clone())), ()),
1797                    lower.clone(),
1798                    diff.into_inner(),
1799                )
1800            })
1801            .collect::<Vec<_>>();
1802        let res = write_handle
1803            .compare_and_append(
1804                updates,
1805                Antichain::from_elem(upper),
1806                Antichain::from_elem(new_upper),
1807            )
1808            .await
1809            .expect("valid usage");
1810        match res {
1811            Ok(()) => return,
1812            Err(err) => {
1813                expected_upper = err.current;
1814                continue;
1815            }
1816        }
1817    }
1818}
1819
1820// Helper method for notifying listeners.
1821fn notify_listeners<T>(
1822    responders: impl IntoIterator<Item = oneshot::Sender<T>>,
1823    result: impl Fn() -> T,
1824) {
1825    for r in responders {
1826        // We don't care if the listener disappeared.
1827        let _ = r.send(result());
1828    }
1829}
1830
1831/// Receive all currently enqueued messages from a task command channel.
1832///
1833/// If no messages are initially enqueued, this function blocks until messages become available or
1834/// the channel is closed.
1835///
1836/// If the `out` buffer has a large amount of free capacity at the end of this operation, it is
1837/// shrunk to reclaim some of its memory.
1838///
1839/// # Cancel safety
1840///
1841/// This function is cancel safe. It only awaits `UnboundedReceiver::recv`, which is itself cancel
1842/// safe.
1843async fn recv_all_commands<T>(rx: &mut mpsc::UnboundedReceiver<T>, out: &mut Vec<T>) {
1844    if let Some(msg) = rx.recv().await {
1845        out.push(msg);
1846    } else {
1847        return; // channel closed
1848    };
1849
1850    out.reserve(rx.len());
1851    while let Ok(msg) = rx.try_recv() {
1852        out.push(msg);
1853    }
1854
1855    // We may have the opportunity to reclaim allocated memory.
1856    // Given that `push` will at most double the capacity when the vector is more than half full,
1857    // and we want to avoid entering into a resizing cycle, we choose to only shrink if the
1858    // vector's length is less than one fourth of its capacity.
1859    if out.capacity() > out.len() * 4 {
1860        out.shrink_to_fit();
1861    }
1862}
1863
1864#[cfg(test)]
1865mod tests {
1866    use std::collections::BTreeSet;
1867
1868    use super::*;
1869    use itertools::Itertools;
1870    use mz_repr::{Datum, Row};
1871    use mz_storage_client::client::StatusUpdate;
1872    use mz_storage_client::healthcheck::{
1873        MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC,
1874    };
1875
1876    #[mz_ore::test]
1877    fn test_row() {
1878        let error_message = "error message";
1879        let hint = "hint message";
1880        let id = GlobalId::User(1);
1881        let status = Status::Dropped;
1882        let row = Row::from(StatusUpdate {
1883            id,
1884            timestamp: chrono::offset::Utc::now(),
1885            status,
1886            error: Some(error_message.to_string()),
1887            hints: BTreeSet::from([hint.to_string()]),
1888            namespaced_errors: Default::default(),
1889            replica_id: None,
1890        });
1891
1892        for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1893            assert!(datum.is_instance_of_sql(column_type));
1894        }
1895
1896        for (datum, column_type) in row
1897            .iter()
1898            .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
1899        {
1900            assert!(datum.is_instance_of_sql(column_type));
1901        }
1902
1903        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1904        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1905        assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
1906
1907        let details = row
1908            .iter()
1909            .nth(4)
1910            .unwrap()
1911            .unwrap_map()
1912            .iter()
1913            .collect::<Vec<_>>();
1914
1915        assert_eq!(details.len(), 1);
1916        let hint_datum = &details[0];
1917
1918        assert_eq!(hint_datum.0, "hints");
1919        assert_eq!(
1920            hint_datum.1.unwrap_list().iter().next().unwrap(),
1921            Datum::String(hint)
1922        );
1923    }
1924
1925    #[mz_ore::test]
1926    fn test_row_without_hint() {
1927        let error_message = "error message";
1928        let id = GlobalId::User(1);
1929        let status = Status::Dropped;
1930        let row = Row::from(StatusUpdate {
1931            id,
1932            timestamp: chrono::offset::Utc::now(),
1933            status,
1934            error: Some(error_message.to_string()),
1935            hints: Default::default(),
1936            namespaced_errors: Default::default(),
1937            replica_id: None,
1938        });
1939
1940        for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1941            assert!(datum.is_instance_of_sql(column_type));
1942        }
1943
1944        for (datum, column_type) in row
1945            .iter()
1946            .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
1947        {
1948            assert!(datum.is_instance_of_sql(column_type));
1949        }
1950
1951        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1952        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1953        assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
1954        assert_eq!(row.iter().nth(4).unwrap(), Datum::Null);
1955    }
1956
1957    #[mz_ore::test]
1958    fn test_row_without_error() {
1959        let id = GlobalId::User(1);
1960        let status = Status::Dropped;
1961        let hint = "hint message";
1962        let row = Row::from(StatusUpdate {
1963            id,
1964            timestamp: chrono::offset::Utc::now(),
1965            status,
1966            error: None,
1967            hints: BTreeSet::from([hint.to_string()]),
1968            namespaced_errors: Default::default(),
1969            replica_id: None,
1970        });
1971
1972        for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1973            assert!(datum.is_instance_of_sql(column_type));
1974        }
1975
1976        for (datum, column_type) in row
1977            .iter()
1978            .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
1979        {
1980            assert!(datum.is_instance_of_sql(column_type));
1981        }
1982
1983        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1984        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1985        assert_eq!(row.iter().nth(3).unwrap(), Datum::Null);
1986
1987        let details = row
1988            .iter()
1989            .nth(4)
1990            .unwrap()
1991            .unwrap_map()
1992            .iter()
1993            .collect::<Vec<_>>();
1994
1995        assert_eq!(details.len(), 1);
1996        let hint_datum = &details[0];
1997
1998        assert_eq!(hint_datum.0, "hints");
1999        assert_eq!(
2000            hint_datum.1.unwrap_list().iter().next().unwrap(),
2001            Datum::String(hint)
2002        );
2003    }
2004
2005    #[mz_ore::test]
2006    fn test_row_with_namespaced() {
2007        let error_message = "error message";
2008        let id = GlobalId::User(1);
2009        let status = Status::Dropped;
2010        let row = Row::from(StatusUpdate {
2011            id,
2012            timestamp: chrono::offset::Utc::now(),
2013            status,
2014            error: Some(error_message.to_string()),
2015            hints: Default::default(),
2016            namespaced_errors: BTreeMap::from([("thing".to_string(), "error".to_string())]),
2017            replica_id: None,
2018        });
2019
2020        for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
2021            assert!(datum.is_instance_of_sql(column_type));
2022        }
2023
2024        for (datum, column_type) in row
2025            .iter()
2026            .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
2027        {
2028            assert!(datum.is_instance_of_sql(column_type));
2029        }
2030
2031        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
2032        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
2033        assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
2034
2035        let details = row
2036            .iter()
2037            .nth(4)
2038            .unwrap()
2039            .unwrap_map()
2040            .iter()
2041            .collect::<Vec<_>>();
2042
2043        assert_eq!(details.len(), 1);
2044        let ns_datum = &details[0];
2045
2046        assert_eq!(ns_datum.0, "namespaced");
2047        assert_eq!(
2048            ns_datum.1.unwrap_map().iter().next().unwrap(),
2049            ("thing", Datum::String("error"))
2050        );
2051    }
2052
2053    #[mz_ore::test]
2054    fn test_row_with_everything() {
2055        let error_message = "error message";
2056        let hint = "hint message";
2057        let id = GlobalId::User(1);
2058        let status = Status::Dropped;
2059        let row = Row::from(StatusUpdate {
2060            id,
2061            timestamp: chrono::offset::Utc::now(),
2062            status,
2063            error: Some(error_message.to_string()),
2064            hints: BTreeSet::from([hint.to_string()]),
2065            namespaced_errors: BTreeMap::from([("thing".to_string(), "error".to_string())]),
2066            replica_id: None,
2067        });
2068
2069        for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
2070            assert!(datum.is_instance_of_sql(column_type));
2071        }
2072
2073        for (datum, column_type) in row
2074            .iter()
2075            .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
2076        {
2077            assert!(datum.is_instance_of_sql(column_type));
2078        }
2079
2080        assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
2081        assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
2082        assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
2083
2084        let details = row
2085            .iter()
2086            .nth(4)
2087            .unwrap()
2088            .unwrap_map()
2089            .iter()
2090            .collect::<Vec<_>>();
2091
2092        assert_eq!(details.len(), 2);
2093        // These are always sorted
2094        let hint_datum = &details[0];
2095        let ns_datum = &details[1];
2096
2097        assert_eq!(hint_datum.0, "hints");
2098        assert_eq!(
2099            hint_datum.1.unwrap_list().iter().next().unwrap(),
2100            Datum::String(hint)
2101        );
2102
2103        assert_eq!(ns_datum.0, "namespaced");
2104        assert_eq!(
2105            ns_datum.1.unwrap_map().iter().next().unwrap(),
2106            ("thing", Datum::String("error"))
2107        );
2108    }
2109}