1use std::any::Any;
61use std::cmp::Reverse;
62use std::collections::{BTreeMap, BinaryHeap};
63use std::fmt::Debug;
64use std::ops::ControlFlow;
65use std::pin::Pin;
66use std::str::FromStr;
67use std::sync::atomic::{AtomicU64, Ordering};
68use std::sync::{Arc, Mutex};
69
70use anyhow::{anyhow, bail};
71use chrono::{DateTime, Utc};
72use differential_dataflow::consolidation;
73use differential_dataflow::lattice::Lattice;
74use futures::future::BoxFuture;
75use futures::stream::StreamExt;
76use futures::{Future, FutureExt};
77use mz_cluster_client::ReplicaId;
78use mz_dyncfg::ConfigSet;
79use mz_ore::now::{EpochMillis, NowFn};
80use mz_ore::retry::Retry;
81use mz_ore::soft_panic_or_log;
82use mz_ore::task::AbortOnDropHandle;
83use mz_ore::vec::VecExt;
84use mz_persist_client::read::ReadHandle;
85use mz_persist_client::write::WriteHandle;
86use mz_persist_types::Codec64;
87use mz_repr::adt::timestamp::CheckedTimestamp;
88use mz_repr::{ColumnName, Diff, GlobalId, Row, TimestampManipulation};
89use mz_storage_client::client::{AppendOnlyUpdate, Status, TimestamplessUpdate};
90use mz_storage_client::controller::{IntrospectionType, MonotonicAppender, StorageWriteOp};
91use mz_storage_client::healthcheck::{
92 MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_METRICS_HISTORY_DESC,
93 WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC, WALLCLOCK_LAG_HISTORY_DESC,
94};
95use mz_storage_client::metrics::StorageControllerMetrics;
96use mz_storage_client::statistics::{SinkStatisticsUpdate, SourceStatisticsUpdate};
97use mz_storage_client::storage_collections::StorageCollections;
98use mz_storage_types::StorageDiff;
99use mz_storage_types::controller::InvalidUpper;
100use mz_storage_types::dyncfgs::{
101 REPLICA_METRICS_HISTORY_RETENTION_INTERVAL, WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL,
102 WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL,
103};
104use mz_storage_types::parameters::{
105 STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT, StorageParameters,
106};
107use mz_storage_types::sources::SourceData;
108use timely::progress::{Antichain, Timestamp};
109use tokio::sync::{mpsc, oneshot, watch};
110use tokio::time::{Duration, Instant};
111use tracing::{debug, error, info};
112
113use crate::{
114 StatusHistoryDesc, StatusHistoryRetentionPolicy, StorageError, collection_mgmt,
115 privatelink_status_history_desc, replica_status_history_desc, sink_status_history_desc,
116 snapshot_statistics, source_status_history_desc, statistics,
117};
118
119const DEFAULT_TICK_MS: u64 = 1_000;
121
122type DifferentialWriteChannel<T> =
124 mpsc::UnboundedSender<(StorageWriteOp, oneshot::Sender<Result<(), StorageError<T>>>)>;
125
126type AppendOnlyWriteChannel<T> = mpsc::UnboundedSender<(
128 Vec<AppendOnlyUpdate>,
129 oneshot::Sender<Result<(), StorageError<T>>>,
130)>;
131
132type WriteTask = AbortOnDropHandle<()>;
133type ShutdownSender = oneshot::Sender<()>;
134
135pub enum CollectionManagerKind {
148 AppendOnly,
149 Differential,
150}
151
152#[derive(Debug, Clone)]
153pub struct CollectionManager<T>
154where
155 T: Timestamp + Lattice + Codec64 + TimestampManipulation,
156{
157 read_only: bool,
160
161 differential_collections:
167 Arc<Mutex<BTreeMap<GlobalId, (DifferentialWriteChannel<T>, WriteTask, ShutdownSender)>>>,
168
169 append_only_collections:
174 Arc<Mutex<BTreeMap<GlobalId, (AppendOnlyWriteChannel<T>, WriteTask, ShutdownSender)>>>,
175
176 user_batch_duration_ms: Arc<AtomicU64>,
179 now: NowFn,
180}
181
182impl<T> CollectionManager<T>
192where
193 T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
194{
195 pub(super) fn new(read_only: bool, now: NowFn) -> CollectionManager<T> {
196 let batch_duration_ms: u64 = STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT
197 .as_millis()
198 .try_into()
199 .expect("known to fit");
200
201 CollectionManager {
202 read_only,
203 differential_collections: Arc::new(Mutex::new(BTreeMap::new())),
204 append_only_collections: Arc::new(Mutex::new(BTreeMap::new())),
205 user_batch_duration_ms: Arc::new(AtomicU64::new(batch_duration_ms)),
206 now,
207 }
208 }
209
210 pub fn update_user_batch_duration(&self, duration: Duration) {
212 tracing::info!(?duration, "updating user batch duration");
213 let millis: u64 = duration.as_millis().try_into().unwrap_or(u64::MAX);
214 self.user_batch_duration_ms.store(millis, Ordering::Relaxed);
215 }
216
217 pub(super) fn register_differential_collection<R>(
225 &self,
226 id: GlobalId,
227 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
228 read_handle_fn: R,
229 force_writable: bool,
230 introspection_config: DifferentialIntrospectionConfig<T>,
231 ) where
232 R: FnMut() -> Pin<
233 Box<dyn Future<Output = ReadHandle<SourceData, (), T, StorageDiff>> + Send>,
234 > + Send
235 + Sync
236 + 'static,
237 {
238 let mut guard = self
239 .differential_collections
240 .lock()
241 .expect("collection_mgmt panicked");
242
243 if let Some((_writer, task, _shutdown_tx)) = guard.get(&id) {
245 if !task.is_finished() {
247 tracing::error!("Registered a collection twice! {id:?}");
249 return;
250 }
251 }
252
253 let read_only = self.get_read_only(id, force_writable);
254
255 let writer_and_handle = DifferentialWriteTask::spawn(
257 id,
258 write_handle,
259 read_handle_fn,
260 read_only,
261 self.now.clone(),
262 introspection_config,
263 );
264 let prev = guard.insert(id, writer_and_handle);
265
266 if let Some((_, prev_task, _)) = prev {
268 assert!(
269 prev_task.is_finished(),
270 "should only spawn a new task if the previous is finished"
271 );
272 }
273 }
274
275 pub(super) fn register_append_only_collection(
280 &self,
281 id: GlobalId,
282 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
283 force_writable: bool,
284 introspection_config: Option<AppendOnlyIntrospectionConfig<T>>,
285 ) {
286 let mut guard = self
287 .append_only_collections
288 .lock()
289 .expect("collection_mgmt panicked");
290
291 if let Some((_writer, task, _shutdown_tx)) = guard.get(&id) {
293 if !task.is_finished() {
295 tracing::error!("Registered a collection twice! {id:?}");
297 return;
298 }
299 }
300
301 let read_only = self.get_read_only(id, force_writable);
302
303 let writer_and_handle = AppendOnlyWriteTask::spawn(
305 id,
306 write_handle,
307 read_only,
308 self.now.clone(),
309 Arc::clone(&self.user_batch_duration_ms),
310 introspection_config,
311 );
312 let prev = guard.insert(id, writer_and_handle);
313
314 if let Some((_, prev_task, _)) = prev {
316 assert!(
317 prev_task.is_finished(),
318 "should only spawn a new task if the previous is finished"
319 );
320 }
321 }
322
323 #[mz_ore::instrument(level = "debug")]
328 pub(super) fn unregister_collection(&self, id: GlobalId) -> BoxFuture<'static, ()> {
329 let prev = self
330 .differential_collections
331 .lock()
332 .expect("CollectionManager panicked")
333 .remove(&id);
334
335 if let Some((_prev_writer, prev_task, shutdown_tx)) = prev {
337 let _ = shutdown_tx.send(());
341 return Box::pin(prev_task.map(|_| ()));
342 }
343
344 let prev = self
345 .append_only_collections
346 .lock()
347 .expect("CollectionManager panicked")
348 .remove(&id);
349
350 if let Some((_prev_writer, prev_task, shutdown_tx)) = prev {
352 let _ = shutdown_tx.send(());
356 return Box::pin(prev_task.map(|_| ()));
357 }
358
359 Box::pin(futures::future::ready(()))
360 }
361
362 pub(super) fn append_only_write_sender(&self, id: GlobalId) -> AppendOnlyWriteChannel<T> {
367 let collections = self.append_only_collections.lock().expect("poisoned");
368 match collections.get(&id) {
369 Some((tx, _, _)) => tx.clone(),
370 None => panic!("missing append-only collection: {id}"),
371 }
372 }
373
374 pub(super) fn differential_write_sender(&self, id: GlobalId) -> DifferentialWriteChannel<T> {
379 let collections = self.differential_collections.lock().expect("poisoned");
380 match collections.get(&id) {
381 Some((tx, _, _)) => tx.clone(),
382 None => panic!("missing differential collection: {id}"),
383 }
384 }
385
386 pub(super) fn blind_write(&self, id: GlobalId, updates: Vec<AppendOnlyUpdate>) {
394 if self.read_only {
395 panic!("attempting blind write to {} while in read-only mode", id);
396 }
397
398 if !updates.is_empty() {
399 let update_tx = self.append_only_write_sender(id);
400 let (tx, _rx) = oneshot::channel();
401 update_tx.send((updates, tx)).expect("rx hung up");
402 }
403 }
404
405 pub(super) fn differential_write(&self, id: GlobalId, op: StorageWriteOp) {
413 if !op.is_empty_append() {
414 let update_tx = self.differential_write_sender(id);
415 let (tx, _rx) = oneshot::channel();
416 update_tx.send((op, tx)).expect("rx hung up");
417 }
418 }
419
420 pub(super) fn differential_append(&self, id: GlobalId, updates: Vec<(Row, Diff)>) {
426 self.differential_write(id, StorageWriteOp::Append { updates })
427 }
428
429 pub(super) fn monotonic_appender(
432 &self,
433 id: GlobalId,
434 ) -> Result<MonotonicAppender<T>, StorageError<T>> {
435 let guard = self
436 .append_only_collections
437 .lock()
438 .expect("CollectionManager panicked");
439 let tx = guard
440 .get(&id)
441 .map(|(tx, _, _)| tx.clone())
442 .ok_or(StorageError::IdentifierMissing(id))?;
443
444 Ok(MonotonicAppender::new(tx))
445 }
446
447 fn get_read_only(&self, id: GlobalId, force_writable: bool) -> bool {
448 if force_writable {
449 assert!(id.is_system(), "unexpected non-system global id: {id:?}");
450 false
451 } else {
452 self.read_only
453 }
454 }
455}
456
457pub(crate) struct DifferentialIntrospectionConfig<T>
458where
459 T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
460{
461 pub(crate) recent_upper: Antichain<T>,
462 pub(crate) introspection_type: IntrospectionType,
463 pub(crate) storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
464 pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
465 pub(crate) source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
466 pub(crate) sink_statistics:
467 Arc<Mutex<BTreeMap<GlobalId, statistics::StatsState<SinkStatisticsUpdate>>>>,
468 pub(crate) statistics_interval: Duration,
469 pub(crate) statistics_interval_receiver: watch::Receiver<Duration>,
470 pub(crate) metrics: StorageControllerMetrics,
471 pub(crate) introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
472}
473
474struct DifferentialWriteTask<T, R>
481where
482 T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
483 R: FnMut() -> Pin<Box<dyn Future<Output = ReadHandle<SourceData, (), T, StorageDiff>> + Send>>
484 + Send
485 + 'static,
486{
487 id: GlobalId,
489
490 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
491
492 read_handle_fn: R,
494
495 read_only: bool,
496
497 now: NowFn,
498
499 upper_tick_interval: tokio::time::Interval,
503
504 cmd_rx: mpsc::UnboundedReceiver<(StorageWriteOp, oneshot::Sender<Result<(), StorageError<T>>>)>,
506
507 shutdown_rx: oneshot::Receiver<()>,
509
510 desired: Vec<(Row, Diff)>,
522
523 to_write: Vec<(Row, Diff)>,
526
527 current_upper: T,
532}
533
534impl<T, R> DifferentialWriteTask<T, R>
535where
536 T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
537 R: FnMut() -> Pin<Box<dyn Future<Output = ReadHandle<SourceData, (), T, StorageDiff>> + Send>>
538 + Send
539 + Sync
540 + 'static,
541{
542 fn spawn(
545 id: GlobalId,
546 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
547 read_handle_fn: R,
548 read_only: bool,
549 now: NowFn,
550 introspection_config: DifferentialIntrospectionConfig<T>,
551 ) -> (DifferentialWriteChannel<T>, WriteTask, ShutdownSender) {
552 let (tx, rx) = mpsc::unbounded_channel();
553 let (shutdown_tx, shutdown_rx) = oneshot::channel();
554
555 let upper_tick_interval = tokio::time::interval(Duration::from_millis(DEFAULT_TICK_MS));
556
557 let current_upper = T::minimum();
558
559 let task = Self {
560 id,
561 write_handle,
562 read_handle_fn,
563 read_only,
564 now,
565 upper_tick_interval,
566 cmd_rx: rx,
567 shutdown_rx,
568 desired: Vec::new(),
569 to_write: Vec::new(),
570 current_upper,
571 };
572
573 let handle = mz_ore::task::spawn(
574 || format!("CollectionManager-differential_write_task-{id}"),
575 async move {
576 if !task.read_only {
577 task.prepare(introspection_config).await;
578 }
579 let res = task.run().await;
580
581 match res {
582 ControlFlow::Break(reason) => {
583 info!("write_task-{} ending: {}", id, reason);
584 }
585 c => {
586 unreachable!(
587 "cannot break out of the loop with a Continue, but got: {:?}",
588 c
589 );
590 }
591 }
592 },
593 );
594
595 (tx, handle.abort_on_drop(), shutdown_tx)
596 }
597
598 async fn prepare(&self, introspection_config: DifferentialIntrospectionConfig<T>) {
604 tracing::info!(%self.id, ?introspection_config.introspection_type, "preparing differential introspection collection for writes");
605
606 match introspection_config.introspection_type {
607 IntrospectionType::ShardMapping => {
608 }
610 IntrospectionType::Frontiers | IntrospectionType::ReplicaFrontiers => {
611 }
614 IntrospectionType::StorageSourceStatistics => {
615 let prev = snapshot_statistics(
616 self.id,
617 introspection_config.recent_upper,
618 &introspection_config.storage_collections,
619 )
620 .await;
621
622 let scraper_token = statistics::spawn_statistics_scraper::<
623 statistics::SourceStatistics,
624 SourceStatisticsUpdate,
625 _,
626 >(
627 self.id.clone(),
628 introspection_config.collection_manager,
630 Arc::clone(&introspection_config.source_statistics),
631 prev,
632 introspection_config.statistics_interval.clone(),
633 introspection_config.statistics_interval_receiver.clone(),
634 introspection_config.metrics,
635 );
636 let web_token = statistics::spawn_webhook_statistics_scraper(
637 introspection_config.source_statistics,
638 introspection_config.statistics_interval,
639 introspection_config.statistics_interval_receiver,
640 );
641
642 introspection_config
645 .introspection_tokens
646 .lock()
647 .expect("poisoned")
648 .insert(self.id, Box::new((scraper_token, web_token)));
649 }
650 IntrospectionType::StorageSinkStatistics => {
651 let prev = snapshot_statistics(
652 self.id,
653 introspection_config.recent_upper,
654 &introspection_config.storage_collections,
655 )
656 .await;
657
658 let scraper_token =
659 statistics::spawn_statistics_scraper::<_, SinkStatisticsUpdate, _>(
660 self.id.clone(),
661 introspection_config.collection_manager,
662 Arc::clone(&introspection_config.sink_statistics),
663 prev,
664 introspection_config.statistics_interval,
665 introspection_config.statistics_interval_receiver,
666 introspection_config.metrics,
667 );
668
669 introspection_config
672 .introspection_tokens
673 .lock()
674 .expect("poisoned")
675 .insert(self.id, scraper_token);
676 }
677
678 IntrospectionType::ComputeDependencies
679 | IntrospectionType::ComputeOperatorHydrationStatus
680 | IntrospectionType::ComputeMaterializedViewRefreshes
681 | IntrospectionType::ComputeErrorCounts
682 | IntrospectionType::ComputeHydrationTimes => {
683 }
686
687 introspection_type @ IntrospectionType::ReplicaMetricsHistory
688 | introspection_type @ IntrospectionType::WallclockLagHistory
689 | introspection_type @ IntrospectionType::WallclockLagHistogram
690 | introspection_type @ IntrospectionType::PreparedStatementHistory
691 | introspection_type @ IntrospectionType::StatementExecutionHistory
692 | introspection_type @ IntrospectionType::SessionHistory
693 | introspection_type @ IntrospectionType::StatementLifecycleHistory
694 | introspection_type @ IntrospectionType::SqlText
695 | introspection_type @ IntrospectionType::SourceStatusHistory
696 | introspection_type @ IntrospectionType::SinkStatusHistory
697 | introspection_type @ IntrospectionType::PrivatelinkConnectionStatusHistory
698 | introspection_type @ IntrospectionType::ReplicaStatusHistory => {
699 unreachable!("not differential collection: {introspection_type:?}")
700 }
701 }
702 }
703
704 async fn run(mut self) -> ControlFlow<String> {
705 const BATCH_SIZE: usize = 4096;
706 let mut updates = Vec::with_capacity(BATCH_SIZE);
707 loop {
708 tokio::select! {
709 biased;
712
713 _ = &mut self.shutdown_rx => {
715 self.handle_shutdown();
716
717 return ControlFlow::Break("graceful shutdown".to_string());
718 }
719
720 count = self.cmd_rx.recv_many(&mut updates, BATCH_SIZE) => {
722 if count > 0 {
723 let _ = self.handle_updates(&mut updates).await?;
724 } else {
725 return ControlFlow::Break("sender has been dropped".to_string());
729 }
730 }
731
732 _ = self.upper_tick_interval.tick() => {
734 if self.read_only {
735 continue;
737 }
738 let _ = self.tick_upper().await?;
739 },
740 }
741 }
742 }
743
744 async fn tick_upper(&mut self) -> ControlFlow<String> {
745 let now = T::from((self.now)());
746
747 if now <= self.current_upper {
748 return ControlFlow::Continue(());
751 }
752
753 assert!(!self.read_only);
754 let res = self
755 .write_handle
756 .compare_and_append_batch(
757 &mut [],
758 Antichain::from_elem(self.current_upper.clone()),
759 Antichain::from_elem(now.clone()),
760 )
761 .await
762 .expect("valid usage");
763 match res {
764 Ok(()) => {
766 tracing::debug!(%self.id, "bumped upper of differential collection");
767 self.current_upper = now;
768 }
769 Err(err) => {
770 let actual_upper = if let Some(ts) = err.current.as_option() {
775 ts.clone()
776 } else {
777 return ControlFlow::Break("upper is the empty antichain".to_string());
778 };
779
780 tracing::info!(%self.id, ?actual_upper, expected_upper = ?self.current_upper, "upper mismatch while bumping upper, syncing to persist state");
781
782 self.current_upper = actual_upper;
783
784 self.sync_to_persist().await;
785 }
786 }
787
788 ControlFlow::Continue(())
789 }
790
791 fn handle_shutdown(&mut self) {
792 let mut senders = Vec::new();
793
794 self.cmd_rx.close();
796
797 while let Ok((_batch, sender)) = self.cmd_rx.try_recv() {
799 senders.push(sender);
800 }
801
802 notify_listeners(senders, || Err(StorageError::IdentifierInvalid(self.id)));
808 }
809
810 async fn handle_updates(
811 &mut self,
812 batch: &mut Vec<(StorageWriteOp, oneshot::Sender<Result<(), StorageError<T>>>)>,
813 ) -> ControlFlow<String> {
814 let batch_duration_ms = STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT;
816
817 let use_batch_now = Instant::now();
818 let min_time_to_complete = use_batch_now + batch_duration_ms;
819
820 tracing::debug!(
821 ?use_batch_now,
822 ?batch_duration_ms,
823 ?min_time_to_complete,
824 "batch duration",
825 );
826
827 let mut responders = Vec::with_capacity(batch.len());
828 for (op, tx) in batch.drain(..) {
829 self.apply_write_op(op);
830 responders.push(tx);
831 }
832
833 consolidation::consolidate(&mut self.desired);
835 consolidation::consolidate(&mut self.to_write);
836
837 self.upper_tick_interval.reset();
848
849 self.write_to_persist(responders).await?;
850
851 tokio::time::sleep_until(min_time_to_complete).await;
856
857 ControlFlow::Continue(())
858 }
859
860 fn apply_write_op(&mut self, op: StorageWriteOp) {
862 match op {
863 StorageWriteOp::Append { updates } => {
864 self.desired.extend_from_slice(&updates);
865 self.to_write.extend(updates);
866 }
867 StorageWriteOp::Delete { filter } => {
868 let to_delete = self.desired.drain_filter_swapping(|(row, _)| filter(row));
869 let retractions = to_delete.map(|(row, diff)| (row, -diff));
870 self.to_write.extend(retractions);
871 }
872 }
873 }
874
875 async fn write_to_persist(
879 &mut self,
880 responders: Vec<oneshot::Sender<Result<(), StorageError<T>>>>,
881 ) -> ControlFlow<String> {
882 if self.read_only {
883 tracing::debug!(%self.id, "not writing to differential collection: read-only");
884 return ControlFlow::Continue(());
886 }
887
888 let retries = Retry::default()
895 .initial_backoff(Duration::from_secs(1))
896 .clamp_backoff(Duration::from_secs(3))
897 .factor(1.25)
898 .max_tries(20)
899 .into_retry_stream();
900 let mut retries = Box::pin(retries);
901
902 loop {
903 let now = T::from((self.now)());
905 let new_upper = std::cmp::max(
906 now,
907 TimestampManipulation::step_forward(&self.current_upper),
908 );
909
910 let updates_to_write = self
911 .to_write
912 .iter()
913 .map(|(row, diff)| {
914 (
915 (SourceData(Ok(row.clone())), ()),
916 self.current_upper.clone(),
917 diff.into_inner(),
918 )
919 })
920 .collect::<Vec<_>>();
921
922 assert!(!self.read_only);
923 let res = self
924 .write_handle
925 .compare_and_append(
926 updates_to_write,
927 Antichain::from_elem(self.current_upper.clone()),
928 Antichain::from_elem(new_upper.clone()),
929 )
930 .await
931 .expect("valid usage");
932 match res {
933 Ok(()) => {
935 notify_listeners(responders, || Ok(()));
937
938 self.current_upper = new_upper;
939
940 self.to_write.clear();
943
944 tracing::debug!(%self.id, "appended to differential collection");
945
946 break;
948 }
949 Err(err) => {
951 let actual_upper = if let Some(ts) = err.current.as_option() {
955 ts.clone()
956 } else {
957 return ControlFlow::Break("upper is the empty antichain".to_string());
958 };
959
960 tracing::info!(%self.id, ?actual_upper, expected_upper = ?self.current_upper, "retrying append for differential collection");
961
962 if retries.next().await.is_none() {
965 let invalid_upper = InvalidUpper {
966 id: self.id,
967 current_upper: err.current,
968 };
969 notify_listeners(responders, || {
970 Err(StorageError::InvalidUppers(vec![invalid_upper.clone()]))
971 });
972 error!(
973 "exhausted retries when appending to managed collection {}",
974 self.id
975 );
976 break;
977 }
978
979 self.current_upper = actual_upper;
980
981 self.sync_to_persist().await;
982
983 debug!(
984 "Retrying invalid-uppers error while appending to differential collection {}",
985 self.id
986 );
987 }
988 }
989 }
990
991 ControlFlow::Continue(())
992 }
993
994 async fn sync_to_persist(&mut self) {
1002 let mut read_handle = (self.read_handle_fn)().await;
1003 let as_of = self
1004 .current_upper
1005 .step_back()
1006 .unwrap_or_else(|| T::minimum());
1007 let as_of = Antichain::from_elem(as_of);
1008 let snapshot = read_handle.snapshot_and_fetch(as_of).await;
1009
1010 let mut negated_oks = match snapshot {
1011 Ok(contents) => {
1012 let mut snapshot = Vec::with_capacity(contents.len());
1013 for ((data, _), _, diff) in contents {
1014 let row = data.expect("invalid protobuf data").0.unwrap();
1015 snapshot.push((row, -Diff::from(diff)));
1016 }
1017 snapshot
1018 }
1019 Err(_) => panic!("read before since"),
1020 };
1021
1022 self.to_write.clear();
1023 self.to_write.extend(self.desired.iter().cloned());
1024 self.to_write.append(&mut negated_oks);
1025 consolidation::consolidate(&mut self.to_write);
1026 }
1027}
1028
1029pub(crate) struct AppendOnlyIntrospectionConfig<T>
1030where
1031 T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
1032{
1033 pub(crate) introspection_type: IntrospectionType,
1034 pub(crate) config_set: Arc<ConfigSet>,
1035 pub(crate) parameters: StorageParameters,
1036 pub(crate) storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
1037}
1038
1039struct AppendOnlyWriteTask<T>
1044where
1045 T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
1046{
1047 id: GlobalId,
1049 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
1050 read_only: bool,
1051 now: NowFn,
1052 user_batch_duration_ms: Arc<AtomicU64>,
1053 rx: mpsc::UnboundedReceiver<(
1055 Vec<AppendOnlyUpdate>,
1056 oneshot::Sender<Result<(), StorageError<T>>>,
1057 )>,
1058
1059 shutdown_rx: oneshot::Receiver<()>,
1061 previous_statuses: Option<BTreeMap<(GlobalId, Option<ReplicaId>), Status>>,
1063}
1064
1065impl<T> AppendOnlyWriteTask<T>
1066where
1067 T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
1068{
1069 fn spawn(
1077 id: GlobalId,
1078 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
1079 read_only: bool,
1080 now: NowFn,
1081 user_batch_duration_ms: Arc<AtomicU64>,
1082 introspection_config: Option<AppendOnlyIntrospectionConfig<T>>,
1083 ) -> (AppendOnlyWriteChannel<T>, WriteTask, ShutdownSender) {
1084 let (tx, rx) = mpsc::unbounded_channel();
1085 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1086
1087 let previous_statuses: Option<BTreeMap<(GlobalId, Option<ReplicaId>), Status>> =
1088 match introspection_config
1089 .as_ref()
1090 .map(|config| config.introspection_type)
1091 {
1092 Some(IntrospectionType::SourceStatusHistory)
1093 | Some(IntrospectionType::SinkStatusHistory) => Some(BTreeMap::new()),
1094
1095 Some(IntrospectionType::ReplicaMetricsHistory)
1096 | Some(IntrospectionType::WallclockLagHistory)
1097 | Some(IntrospectionType::WallclockLagHistogram)
1098 | Some(IntrospectionType::PrivatelinkConnectionStatusHistory)
1099 | Some(IntrospectionType::ReplicaStatusHistory)
1100 | Some(IntrospectionType::PreparedStatementHistory)
1101 | Some(IntrospectionType::StatementExecutionHistory)
1102 | Some(IntrospectionType::SessionHistory)
1103 | Some(IntrospectionType::StatementLifecycleHistory)
1104 | Some(IntrospectionType::SqlText)
1105 | None => None,
1106
1107 Some(introspection_type @ IntrospectionType::ShardMapping)
1108 | Some(introspection_type @ IntrospectionType::Frontiers)
1109 | Some(introspection_type @ IntrospectionType::ReplicaFrontiers)
1110 | Some(introspection_type @ IntrospectionType::StorageSourceStatistics)
1111 | Some(introspection_type @ IntrospectionType::StorageSinkStatistics)
1112 | Some(introspection_type @ IntrospectionType::ComputeDependencies)
1113 | Some(introspection_type @ IntrospectionType::ComputeOperatorHydrationStatus)
1114 | Some(introspection_type @ IntrospectionType::ComputeMaterializedViewRefreshes)
1115 | Some(introspection_type @ IntrospectionType::ComputeErrorCounts)
1116 | Some(introspection_type @ IntrospectionType::ComputeHydrationTimes) => {
1117 unreachable!("not append-only collection: {introspection_type:?}")
1118 }
1119 };
1120
1121 let mut task = Self {
1122 id,
1123 write_handle,
1124 rx,
1125 shutdown_rx,
1126 read_only,
1127 now,
1128 user_batch_duration_ms,
1129 previous_statuses,
1130 };
1131
1132 let handle = mz_ore::task::spawn(
1133 || format!("CollectionManager-append_only_write_task-{id}"),
1134 async move {
1135 if !task.read_only {
1136 task.prepare(introspection_config).await;
1137 }
1138 task.run().await;
1139 },
1140 );
1141
1142 (tx, handle.abort_on_drop(), shutdown_tx)
1143 }
1144
1145 async fn prepare(&mut self, introspection_config: Option<AppendOnlyIntrospectionConfig<T>>) {
1150 let Some(AppendOnlyIntrospectionConfig {
1151 introspection_type,
1152 config_set,
1153 parameters,
1154 storage_collections,
1155 }) = introspection_config
1156 else {
1157 return;
1158 };
1159 let initial_statuses = match introspection_type {
1160 IntrospectionType::ReplicaMetricsHistory
1161 | IntrospectionType::WallclockLagHistory
1162 | IntrospectionType::WallclockLagHistogram => {
1163 let result = partially_truncate_metrics_history(
1164 self.id,
1165 introspection_type,
1166 &mut self.write_handle,
1167 config_set,
1168 self.now.clone(),
1169 storage_collections,
1170 )
1171 .await;
1172 if let Err(error) = result {
1173 soft_panic_or_log!(
1174 "error truncating metrics history: {error} (type={introspection_type:?})"
1175 );
1176 }
1177 Vec::new()
1178 }
1179
1180 IntrospectionType::PrivatelinkConnectionStatusHistory => {
1181 partially_truncate_status_history(
1182 self.id,
1183 IntrospectionType::PrivatelinkConnectionStatusHistory,
1184 &mut self.write_handle,
1185 privatelink_status_history_desc(¶meters),
1186 self.now.clone(),
1187 &storage_collections,
1188 )
1189 .await;
1190 Vec::new()
1191 }
1192 IntrospectionType::ReplicaStatusHistory => {
1193 partially_truncate_status_history(
1194 self.id,
1195 IntrospectionType::ReplicaStatusHistory,
1196 &mut self.write_handle,
1197 replica_status_history_desc(¶meters),
1198 self.now.clone(),
1199 &storage_collections,
1200 )
1201 .await;
1202 Vec::new()
1203 }
1204
1205 IntrospectionType::PreparedStatementHistory
1208 | IntrospectionType::StatementExecutionHistory
1209 | IntrospectionType::SessionHistory
1210 | IntrospectionType::StatementLifecycleHistory
1211 | IntrospectionType::SqlText => {
1212 Vec::new()
1217 }
1218
1219 IntrospectionType::SourceStatusHistory => {
1220 let last_status_per_id = partially_truncate_status_history(
1221 self.id,
1222 IntrospectionType::SourceStatusHistory,
1223 &mut self.write_handle,
1224 source_status_history_desc(¶meters),
1225 self.now.clone(),
1226 &storage_collections,
1227 )
1228 .await;
1229
1230 let status_col = MZ_SOURCE_STATUS_HISTORY_DESC
1231 .get_by_name(&ColumnName::from("status"))
1232 .expect("schema has not changed")
1233 .0;
1234
1235 last_status_per_id
1236 .into_iter()
1237 .map(|(id, row)| {
1238 (
1239 id,
1240 Status::from_str(
1241 row.iter()
1242 .nth(status_col)
1243 .expect("schema has not changed")
1244 .unwrap_str(),
1245 )
1246 .expect("statuses must be uncorrupted"),
1247 )
1248 })
1249 .collect()
1250 }
1251 IntrospectionType::SinkStatusHistory => {
1252 let last_status_per_id = partially_truncate_status_history(
1253 self.id,
1254 IntrospectionType::SinkStatusHistory,
1255 &mut self.write_handle,
1256 sink_status_history_desc(¶meters),
1257 self.now.clone(),
1258 &storage_collections,
1259 )
1260 .await;
1261
1262 let status_col = MZ_SINK_STATUS_HISTORY_DESC
1263 .get_by_name(&ColumnName::from("status"))
1264 .expect("schema has not changed")
1265 .0;
1266
1267 last_status_per_id
1268 .into_iter()
1269 .map(|(id, row)| {
1270 (
1271 id,
1272 Status::from_str(
1273 row.iter()
1274 .nth(status_col)
1275 .expect("schema has not changed")
1276 .unwrap_str(),
1277 )
1278 .expect("statuses must be uncorrupted"),
1279 )
1280 })
1281 .collect()
1282 }
1283
1284 introspection_type @ IntrospectionType::ShardMapping
1285 | introspection_type @ IntrospectionType::Frontiers
1286 | introspection_type @ IntrospectionType::ReplicaFrontiers
1287 | introspection_type @ IntrospectionType::StorageSourceStatistics
1288 | introspection_type @ IntrospectionType::StorageSinkStatistics
1289 | introspection_type @ IntrospectionType::ComputeDependencies
1290 | introspection_type @ IntrospectionType::ComputeOperatorHydrationStatus
1291 | introspection_type @ IntrospectionType::ComputeMaterializedViewRefreshes
1292 | introspection_type @ IntrospectionType::ComputeErrorCounts
1293 | introspection_type @ IntrospectionType::ComputeHydrationTimes => {
1294 unreachable!("not append-only collection: {introspection_type:?}")
1295 }
1296 };
1297 if let Some(previous_statuses) = &mut self.previous_statuses {
1298 previous_statuses.extend(initial_statuses);
1299 }
1300 }
1301
1302 async fn run(mut self) {
1303 let mut interval = tokio::time::interval(Duration::from_millis(DEFAULT_TICK_MS));
1304
1305 const BATCH_SIZE: usize = 4096;
1306 let mut batch: Vec<(Vec<_>, _)> = Vec::with_capacity(BATCH_SIZE);
1307
1308 'run: loop {
1309 tokio::select! {
1310 biased;
1313
1314 _ = &mut self.shutdown_rx => {
1316 let mut senders = Vec::new();
1317
1318 self.rx.close();
1320
1321 while let Ok((_batch, sender)) = self.rx.try_recv() {
1323 senders.push(sender);
1324 }
1325
1326 notify_listeners(senders, || Err(StorageError::IdentifierInvalid(self.id)));
1332
1333 break 'run;
1334 }
1335
1336 count = self.rx.recv_many(&mut batch, BATCH_SIZE) => {
1338 if count > 0 {
1339 let batch_duration_ms = match self.id {
1342 GlobalId::User(_) => Duration::from_millis(self.user_batch_duration_ms.load(Ordering::Relaxed)),
1343 _ => STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT,
1345 };
1346 let use_batch_now = Instant::now();
1347 let min_time_to_complete = use_batch_now + batch_duration_ms;
1348
1349 tracing::debug!(
1350 ?use_batch_now,
1351 ?batch_duration_ms,
1352 ?min_time_to_complete,
1353 "batch duration",
1354 );
1355
1356 interval.reset();
1365
1366
1367 let mut all_rows = Vec::with_capacity(batch.iter().map(|(rows, _)| rows.len()).sum());
1368 let mut responders = Vec::with_capacity(batch.len());
1369
1370 for (updates, responder) in batch.drain(..) {
1371 let rows = self.process_updates(updates);
1372
1373 all_rows.extend(rows.map(|(row, diff)| TimestamplessUpdate { row, diff}));
1374 responders.push(responder);
1375 }
1376
1377 if self.read_only {
1378 tracing::warn!(%self.id, ?all_rows, "append while in read-only mode");
1379 notify_listeners(responders, || Err(StorageError::ReadOnly));
1380 continue;
1381 }
1382
1383 let at_least = T::from((self.now)());
1385
1386 if !all_rows.is_empty() {
1387 monotonic_append(&mut self.write_handle, all_rows, at_least).await;
1388 }
1389 notify_listeners(responders, || Ok(()));
1391
1392 tokio::time::sleep_until(min_time_to_complete).await;
1397 } else {
1398 break 'run;
1402 }
1403 }
1404
1405 _ = interval.tick() => {
1407 if self.read_only {
1408 continue;
1410 }
1411
1412 let now = T::from((self.now)());
1414 let updates = vec![];
1415 let at_least = now.clone();
1416
1417 monotonic_append(&mut self.write_handle, updates, at_least).await;
1422 },
1423 }
1424 }
1425
1426 info!("write_task-{} ending", self.id);
1427 }
1428
1429 fn process_updates(
1432 &mut self,
1433 updates: Vec<AppendOnlyUpdate>,
1434 ) -> impl Iterator<Item = (Row, Diff)> {
1435 let updates = if let Some(previous_statuses) = &mut self.previous_statuses {
1436 let new: Vec<_> = updates
1437 .into_iter()
1438 .filter(|r| match r {
1439 AppendOnlyUpdate::Row(_) => true,
1440 AppendOnlyUpdate::Status(update) => {
1441 match (
1442 previous_statuses
1443 .get(&(update.id, update.replica_id))
1444 .as_deref(),
1445 &update.status,
1446 ) {
1447 (None, _) => true,
1448 (Some(old), new) => old.superseded_by(*new),
1449 }
1450 }
1451 })
1452 .collect();
1453 previous_statuses.extend(new.iter().filter_map(|update| match update {
1454 AppendOnlyUpdate::Row(_) => None,
1455 AppendOnlyUpdate::Status(update) => {
1456 Some(((update.id, update.replica_id), update.status))
1457 }
1458 }));
1459 new
1460 } else {
1461 updates
1462 };
1463
1464 updates.into_iter().map(AppendOnlyUpdate::into_row)
1465 }
1466}
1467
1468async fn partially_truncate_metrics_history<T>(
1475 id: GlobalId,
1476 introspection_type: IntrospectionType,
1477 write_handle: &mut WriteHandle<SourceData, (), T, StorageDiff>,
1478 config_set: Arc<ConfigSet>,
1479 now: NowFn,
1480 storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
1481) -> Result<(), anyhow::Error>
1482where
1483 T: Codec64 + From<EpochMillis> + TimestampManipulation,
1484{
1485 let (keep_duration, occurred_at_col) = match introspection_type {
1486 IntrospectionType::ReplicaMetricsHistory => (
1487 REPLICA_METRICS_HISTORY_RETENTION_INTERVAL.get(&config_set),
1488 REPLICA_METRICS_HISTORY_DESC
1489 .get_by_name(&ColumnName::from("occurred_at"))
1490 .expect("schema has not changed")
1491 .0,
1492 ),
1493 IntrospectionType::WallclockLagHistory => (
1494 WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL.get(&config_set),
1495 WALLCLOCK_LAG_HISTORY_DESC
1496 .get_by_name(&ColumnName::from("occurred_at"))
1497 .expect("schema has not changed")
1498 .0,
1499 ),
1500 IntrospectionType::WallclockLagHistogram => (
1501 WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL.get(&config_set),
1502 WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC
1503 .get_by_name(&ColumnName::from("period_start"))
1504 .expect("schema has not changed")
1505 .0,
1506 ),
1507 _ => panic!("not a metrics history: {introspection_type:?}"),
1508 };
1509
1510 let upper = write_handle.fetch_recent_upper().await;
1511 let Some(upper_ts) = upper.as_option() else {
1512 bail!("collection is sealed");
1513 };
1514 let Some(as_of_ts) = upper_ts.step_back() else {
1515 return Ok(()); };
1517
1518 let mut rows = storage_collections
1519 .snapshot(id, as_of_ts)
1520 .await
1521 .map_err(|e| anyhow!("reading snapshot: {e:?}"))?;
1522
1523 let now = mz_ore::now::to_datetime(now());
1524 let keep_since = now - keep_duration;
1525
1526 for (row, diff) in &mut rows {
1529 let datums = row.unpack();
1530 let occurred_at = datums[occurred_at_col].unwrap_timestamptz();
1531 *diff = if *occurred_at < keep_since { -*diff } else { 0 };
1532 }
1533
1534 consolidation::consolidate(&mut rows);
1536
1537 if rows.is_empty() {
1538 return Ok(());
1539 }
1540
1541 let old_upper_ts = upper_ts.clone();
1546 let write_ts = old_upper_ts.clone();
1547 let new_upper_ts = TimestampManipulation::step_forward(&old_upper_ts);
1548
1549 let updates = rows
1550 .into_iter()
1551 .map(|(row, diff)| ((SourceData(Ok(row)), ()), write_ts.clone(), diff));
1552
1553 write_handle
1554 .compare_and_append(
1555 updates,
1556 Antichain::from_elem(old_upper_ts),
1557 Antichain::from_elem(new_upper_ts),
1558 )
1559 .await
1560 .expect("valid usage")
1561 .map_err(|e| anyhow!("appending retractions: {e:?}"))
1562}
1563
1564pub(crate) async fn partially_truncate_status_history<T, K>(
1574 id: GlobalId,
1575 introspection_type: IntrospectionType,
1576 write_handle: &mut WriteHandle<SourceData, (), T, StorageDiff>,
1577 status_history_desc: StatusHistoryDesc<K>,
1578 now: NowFn,
1579 storage_collections: &Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
1580) -> BTreeMap<K, Row>
1581where
1582 T: Codec64 + From<EpochMillis> + TimestampManipulation,
1583 K: Clone + Debug + Ord + Send + Sync,
1584{
1585 let upper = write_handle.fetch_recent_upper().await.clone();
1586
1587 let mut rows = match upper.as_option() {
1588 Some(f) if f > &T::minimum() => {
1589 let as_of = f.step_back().unwrap();
1590
1591 storage_collections
1592 .snapshot(id, as_of)
1593 .await
1594 .expect("snapshot succeeds")
1595 }
1596 _ => return BTreeMap::new(),
1599 };
1600
1601 let mut latest_row_per_key: BTreeMap<K, (CheckedTimestamp<DateTime<Utc>>, Row)> =
1603 BTreeMap::new();
1604
1605 differential_dataflow::consolidation::consolidate(&mut rows);
1607
1608 let mut deletions = vec![];
1609
1610 let mut handle_row = {
1611 let latest_row_per_key = &mut latest_row_per_key;
1612 move |row: &Row, diff| {
1613 let datums = row.unpack();
1614 let key = (status_history_desc.extract_key)(&datums);
1615 let timestamp = (status_history_desc.extract_time)(&datums);
1616
1617 assert!(
1618 diff > 0,
1619 "only know how to operate over consolidated data with diffs > 0, \
1620 found diff {diff} for object {key:?} in {introspection_type:?}",
1621 );
1622
1623 match latest_row_per_key.get(&key) {
1625 Some(existing) if &existing.0 > ×tamp => {}
1626 _ => {
1627 latest_row_per_key.insert(key.clone(), (timestamp, row.clone()));
1628 }
1629 };
1630 (key, timestamp)
1631 }
1632 };
1633
1634 match status_history_desc.retention_policy {
1635 StatusHistoryRetentionPolicy::LastN(n) => {
1636 let mut last_n_entries_per_key: BTreeMap<
1638 K,
1639 BinaryHeap<Reverse<(CheckedTimestamp<DateTime<Utc>>, Row)>>,
1640 > = BTreeMap::new();
1641
1642 for (row, diff) in rows {
1643 let (key, timestamp) = handle_row(&row, diff);
1644
1645 let entries = last_n_entries_per_key.entry(key).or_default();
1648 for _ in 0..diff {
1649 entries.push(Reverse((timestamp, row.clone())));
1659
1660 while entries.len() > n {
1663 if let Some(Reverse((_, r))) = entries.pop() {
1664 deletions.push(r);
1665 }
1666 }
1667 }
1668 }
1669 }
1670 StatusHistoryRetentionPolicy::TimeWindow(time_window) => {
1671 let now = mz_ore::now::to_datetime(now());
1673 let keep_since = now - time_window;
1674
1675 for (row, diff) in rows {
1677 let (_, timestamp) = handle_row(&row, diff);
1678
1679 if *timestamp < keep_since {
1680 deletions.push(row);
1681 }
1682 }
1683 }
1684 }
1685
1686 let expected_upper = upper.into_option().expect("checked above");
1691 let new_upper = TimestampManipulation::step_forward(&expected_upper);
1692
1693 let updates = deletions
1695 .into_iter()
1696 .map(|row| ((SourceData(Ok(row)), ()), expected_upper.clone(), -1))
1697 .collect::<Vec<_>>();
1698
1699 let res = write_handle
1700 .compare_and_append(
1701 updates,
1702 Antichain::from_elem(expected_upper.clone()),
1703 Antichain::from_elem(new_upper),
1704 )
1705 .await
1706 .expect("usage was valid");
1707
1708 match res {
1709 Ok(_) => {
1710 }
1712 Err(err) => {
1713 info!(
1721 %id, ?expected_upper, current_upper = ?err.current,
1722 "failed to append partial truncation",
1723 );
1724 }
1725 }
1726
1727 latest_row_per_key
1728 .into_iter()
1729 .map(|(key, (_, row))| (key, row))
1730 .collect()
1731}
1732
1733async fn monotonic_append<T: Timestamp + Lattice + Codec64 + TimestampManipulation>(
1734 write_handle: &mut WriteHandle<SourceData, (), T, StorageDiff>,
1735 updates: Vec<TimestamplessUpdate>,
1736 at_least: T,
1737) {
1738 let mut expected_upper = write_handle.shared_upper();
1739 loop {
1740 if updates.is_empty() && expected_upper.is_empty() {
1741 return;
1745 }
1746
1747 let upper = expected_upper
1748 .into_option()
1749 .expect("cannot append data to closed collection");
1750
1751 let lower = if upper.less_than(&at_least) {
1752 at_least.clone()
1753 } else {
1754 upper.clone()
1755 };
1756
1757 let new_upper = TimestampManipulation::step_forward(&lower);
1758 let updates = updates
1759 .iter()
1760 .map(|TimestamplessUpdate { row, diff }| {
1761 (
1762 (SourceData(Ok(row.clone())), ()),
1763 lower.clone(),
1764 diff.into_inner(),
1765 )
1766 })
1767 .collect::<Vec<_>>();
1768 let res = write_handle
1769 .compare_and_append(
1770 updates,
1771 Antichain::from_elem(upper),
1772 Antichain::from_elem(new_upper),
1773 )
1774 .await
1775 .expect("valid usage");
1776 match res {
1777 Ok(()) => return,
1778 Err(err) => {
1779 expected_upper = err.current;
1780 continue;
1781 }
1782 }
1783 }
1784}
1785
1786fn notify_listeners<T>(
1788 responders: impl IntoIterator<Item = oneshot::Sender<T>>,
1789 result: impl Fn() -> T,
1790) {
1791 for r in responders {
1792 let _ = r.send(result());
1794 }
1795}
1796
1797#[cfg(test)]
1798mod tests {
1799 use std::collections::BTreeSet;
1800
1801 use super::*;
1802 use mz_repr::{Datum, Row};
1803 use mz_storage_client::client::StatusUpdate;
1804 use mz_storage_client::healthcheck::{
1805 MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC,
1806 };
1807
1808 #[mz_ore::test]
1809 fn test_row() {
1810 let error_message = "error message";
1811 let hint = "hint message";
1812 let id = GlobalId::User(1);
1813 let status = Status::Dropped;
1814 let row = Row::from(StatusUpdate {
1815 id,
1816 timestamp: chrono::offset::Utc::now(),
1817 status,
1818 error: Some(error_message.to_string()),
1819 hints: BTreeSet::from([hint.to_string()]),
1820 namespaced_errors: Default::default(),
1821 replica_id: None,
1822 });
1823
1824 for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1825 assert!(datum.is_instance_of(column_type));
1826 }
1827
1828 for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) {
1829 assert!(datum.is_instance_of(column_type));
1830 }
1831
1832 assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1833 assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1834 assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
1835
1836 let details = row
1837 .iter()
1838 .nth(4)
1839 .unwrap()
1840 .unwrap_map()
1841 .iter()
1842 .collect::<Vec<_>>();
1843
1844 assert_eq!(details.len(), 1);
1845 let hint_datum = &details[0];
1846
1847 assert_eq!(hint_datum.0, "hints");
1848 assert_eq!(
1849 hint_datum.1.unwrap_list().iter().next().unwrap(),
1850 Datum::String(hint)
1851 );
1852 }
1853
1854 #[mz_ore::test]
1855 fn test_row_without_hint() {
1856 let error_message = "error message";
1857 let id = GlobalId::User(1);
1858 let status = Status::Dropped;
1859 let row = Row::from(StatusUpdate {
1860 id,
1861 timestamp: chrono::offset::Utc::now(),
1862 status,
1863 error: Some(error_message.to_string()),
1864 hints: Default::default(),
1865 namespaced_errors: Default::default(),
1866 replica_id: None,
1867 });
1868
1869 for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1870 assert!(datum.is_instance_of(column_type));
1871 }
1872
1873 for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) {
1874 assert!(datum.is_instance_of(column_type));
1875 }
1876
1877 assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1878 assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1879 assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
1880 assert_eq!(row.iter().nth(4).unwrap(), Datum::Null);
1881 }
1882
1883 #[mz_ore::test]
1884 fn test_row_without_error() {
1885 let id = GlobalId::User(1);
1886 let status = Status::Dropped;
1887 let hint = "hint message";
1888 let row = Row::from(StatusUpdate {
1889 id,
1890 timestamp: chrono::offset::Utc::now(),
1891 status,
1892 error: None,
1893 hints: BTreeSet::from([hint.to_string()]),
1894 namespaced_errors: Default::default(),
1895 replica_id: None,
1896 });
1897
1898 for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1899 assert!(datum.is_instance_of(column_type));
1900 }
1901
1902 for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) {
1903 assert!(datum.is_instance_of(column_type));
1904 }
1905
1906 assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1907 assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1908 assert_eq!(row.iter().nth(3).unwrap(), Datum::Null);
1909
1910 let details = row
1911 .iter()
1912 .nth(4)
1913 .unwrap()
1914 .unwrap_map()
1915 .iter()
1916 .collect::<Vec<_>>();
1917
1918 assert_eq!(details.len(), 1);
1919 let hint_datum = &details[0];
1920
1921 assert_eq!(hint_datum.0, "hints");
1922 assert_eq!(
1923 hint_datum.1.unwrap_list().iter().next().unwrap(),
1924 Datum::String(hint)
1925 );
1926 }
1927
1928 #[mz_ore::test]
1929 fn test_row_with_namespaced() {
1930 let error_message = "error message";
1931 let id = GlobalId::User(1);
1932 let status = Status::Dropped;
1933 let row = Row::from(StatusUpdate {
1934 id,
1935 timestamp: chrono::offset::Utc::now(),
1936 status,
1937 error: Some(error_message.to_string()),
1938 hints: Default::default(),
1939 namespaced_errors: BTreeMap::from([("thing".to_string(), "error".to_string())]),
1940 replica_id: None,
1941 });
1942
1943 for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1944 assert!(datum.is_instance_of(column_type));
1945 }
1946
1947 for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) {
1948 assert!(datum.is_instance_of(column_type));
1949 }
1950
1951 assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1952 assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1953 assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
1954
1955 let details = row
1956 .iter()
1957 .nth(4)
1958 .unwrap()
1959 .unwrap_map()
1960 .iter()
1961 .collect::<Vec<_>>();
1962
1963 assert_eq!(details.len(), 1);
1964 let ns_datum = &details[0];
1965
1966 assert_eq!(ns_datum.0, "namespaced");
1967 assert_eq!(
1968 ns_datum.1.unwrap_map().iter().next().unwrap(),
1969 ("thing", Datum::String("error"))
1970 );
1971 }
1972
1973 #[mz_ore::test]
1974 fn test_row_with_everything() {
1975 let error_message = "error message";
1976 let hint = "hint message";
1977 let id = GlobalId::User(1);
1978 let status = Status::Dropped;
1979 let row = Row::from(StatusUpdate {
1980 id,
1981 timestamp: chrono::offset::Utc::now(),
1982 status,
1983 error: Some(error_message.to_string()),
1984 hints: BTreeSet::from([hint.to_string()]),
1985 namespaced_errors: BTreeMap::from([("thing".to_string(), "error".to_string())]),
1986 replica_id: None,
1987 });
1988
1989 for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1990 assert!(datum.is_instance_of(column_type));
1991 }
1992
1993 for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) {
1994 assert!(datum.is_instance_of(column_type));
1995 }
1996
1997 assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1998 assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1999 assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
2000
2001 let details = row
2002 .iter()
2003 .nth(4)
2004 .unwrap()
2005 .unwrap_map()
2006 .iter()
2007 .collect::<Vec<_>>();
2008
2009 assert_eq!(details.len(), 2);
2010 let hint_datum = &details[0];
2012 let ns_datum = &details[1];
2013
2014 assert_eq!(hint_datum.0, "hints");
2015 assert_eq!(
2016 hint_datum.1.unwrap_list().iter().next().unwrap(),
2017 Datum::String(hint)
2018 );
2019
2020 assert_eq!(ns_datum.0, "namespaced");
2021 assert_eq!(
2022 ns_datum.1.unwrap_map().iter().next().unwrap(),
2023 ("thing", Datum::String("error"))
2024 );
2025 }
2026}