1use std::any::Any;
61use std::cmp::Reverse;
62use std::collections::{BTreeMap, BinaryHeap};
63use std::fmt::Debug;
64use std::ops::ControlFlow;
65use std::pin::Pin;
66use std::str::FromStr;
67use std::sync::atomic::{AtomicU64, Ordering};
68use std::sync::{Arc, Mutex};
69
70use anyhow::{anyhow, bail};
71use chrono::{DateTime, Utc};
72use differential_dataflow::consolidation;
73use futures::future::BoxFuture;
74use futures::stream::StreamExt;
75use futures::{Future, FutureExt};
76use mz_cluster_client::ReplicaId;
77use mz_dyncfg::ConfigSet;
78use mz_ore::now::NowFn;
79use mz_ore::retry::Retry;
80use mz_ore::soft_panic_or_log;
81use mz_ore::task::AbortOnDropHandle;
82use mz_persist_client::batch::Added;
83use mz_persist_client::read::ReadHandle;
84use mz_persist_client::write::WriteHandle;
85use mz_repr::adt::timestamp::CheckedTimestamp;
86use mz_repr::{ColumnName, DatumVec, Diff, GlobalId, Row, Timestamp};
87use mz_storage_client::client::{AppendOnlyUpdate, Status, TimestamplessUpdate};
88use mz_storage_client::controller::{IntrospectionType, MonotonicAppender, StorageWriteOp};
89use mz_storage_client::healthcheck::{
90 MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_METRICS_HISTORY_DESC,
91 WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC, WALLCLOCK_LAG_HISTORY_DESC,
92};
93use mz_storage_client::metrics::StorageControllerMetrics;
94use mz_storage_client::statistics::ControllerSinkStatistics;
95use mz_storage_client::storage_collections::StorageCollections;
96use mz_storage_types::StorageDiff;
97use mz_storage_types::controller::InvalidUpper;
98use mz_storage_types::dyncfgs::{
99 REPLICA_METRICS_HISTORY_RETENTION_INTERVAL, WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL,
100 WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL,
101};
102use mz_storage_types::parameters::{
103 STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT, StorageParameters,
104};
105use mz_storage_types::sources::SourceData;
106use timely::progress::Antichain;
107use tokio::sync::{mpsc, oneshot, watch};
108use tokio::time::{Duration, Instant};
109use tracing::{debug, error, info};
110
111use crate::{
112 StatusHistoryDesc, StatusHistoryRetentionPolicy, StorageError, collection_mgmt,
113 privatelink_status_history_desc, replica_status_history_desc, sink_status_history_desc,
114 snapshot_statistics, source_status_history_desc, statistics,
115};
116
117const DEFAULT_TICK_MS: u64 = 1_000;
119
120type DifferentialWriteChannel =
122 mpsc::UnboundedSender<(StorageWriteOp, oneshot::Sender<Result<(), StorageError>>)>;
123
124type AppendOnlyWriteChannel = mpsc::UnboundedSender<(
126 Vec<AppendOnlyUpdate>,
127 oneshot::Sender<Result<(), StorageError>>,
128)>;
129
130type WriteTask = AbortOnDropHandle<()>;
131type ShutdownSender = oneshot::Sender<()>;
132
133pub enum CollectionManagerKind {
146 AppendOnly,
147 Differential,
148}
149
150#[derive(Debug, Clone)]
151pub struct CollectionManager {
152 read_only: bool,
155
156 differential_collections:
162 Arc<Mutex<BTreeMap<GlobalId, (DifferentialWriteChannel, WriteTask, ShutdownSender)>>>,
163
164 append_only_collections:
169 Arc<Mutex<BTreeMap<GlobalId, (AppendOnlyWriteChannel, WriteTask, ShutdownSender)>>>,
170
171 user_batch_duration_ms: Arc<AtomicU64>,
174 now: NowFn,
175}
176
177impl CollectionManager {
187 pub(super) fn new(read_only: bool, now: NowFn) -> CollectionManager {
188 let batch_duration_ms: u64 = STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT
189 .as_millis()
190 .try_into()
191 .expect("known to fit");
192
193 CollectionManager {
194 read_only,
195 differential_collections: Arc::new(Mutex::new(BTreeMap::new())),
196 append_only_collections: Arc::new(Mutex::new(BTreeMap::new())),
197 user_batch_duration_ms: Arc::new(AtomicU64::new(batch_duration_ms)),
198 now,
199 }
200 }
201
202 pub fn update_user_batch_duration(&self, duration: Duration) {
204 tracing::info!(?duration, "updating user batch duration");
205 let millis: u64 = duration.as_millis().try_into().unwrap_or(u64::MAX);
206 self.user_batch_duration_ms.store(millis, Ordering::Relaxed);
207 }
208
209 pub(super) fn register_differential_collection<R>(
217 &self,
218 id: GlobalId,
219 write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
220 read_handle_fn: R,
221 force_writable: bool,
222 introspection_config: DifferentialIntrospectionConfig,
223 ) where
224 R: FnMut() -> Pin<
225 Box<dyn Future<Output = ReadHandle<SourceData, (), Timestamp, StorageDiff>> + Send>,
226 > + Send
227 + Sync
228 + 'static,
229 {
230 let mut guard = self
231 .differential_collections
232 .lock()
233 .expect("collection_mgmt panicked");
234
235 if let Some((_writer, task, _shutdown_tx)) = guard.get(&id) {
237 if !task.is_finished() {
239 tracing::error!("Registered a collection twice! {id:?}");
241 return;
242 }
243 }
244
245 let read_only = self.get_read_only(id, force_writable);
246
247 let writer_and_handle = DifferentialWriteTask::spawn(
249 id,
250 write_handle,
251 read_handle_fn,
252 read_only,
253 self.now.clone(),
254 introspection_config,
255 );
256 let prev = guard.insert(id, writer_and_handle);
257
258 if let Some((_, prev_task, _)) = prev {
260 assert!(
261 prev_task.is_finished(),
262 "should only spawn a new task if the previous is finished"
263 );
264 }
265 }
266
267 pub(super) fn register_append_only_collection(
272 &self,
273 id: GlobalId,
274 write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
275 force_writable: bool,
276 introspection_config: Option<AppendOnlyIntrospectionConfig>,
277 ) {
278 let mut guard = self
279 .append_only_collections
280 .lock()
281 .expect("collection_mgmt panicked");
282
283 if let Some((_writer, task, _shutdown_tx)) = guard.get(&id) {
285 if !task.is_finished() {
287 tracing::error!("Registered a collection twice! {id:?}");
289 return;
290 }
291 }
292
293 let read_only = self.get_read_only(id, force_writable);
294
295 let writer_and_handle = AppendOnlyWriteTask::spawn(
297 id,
298 write_handle,
299 read_only,
300 self.now.clone(),
301 Arc::clone(&self.user_batch_duration_ms),
302 introspection_config,
303 );
304 let prev = guard.insert(id, writer_and_handle);
305
306 if let Some((_, prev_task, _)) = prev {
308 assert!(
309 prev_task.is_finished(),
310 "should only spawn a new task if the previous is finished"
311 );
312 }
313 }
314
315 #[mz_ore::instrument(level = "debug")]
320 pub(super) fn unregister_collection(&self, id: GlobalId) -> BoxFuture<'static, ()> {
321 let prev = self
322 .differential_collections
323 .lock()
324 .expect("CollectionManager panicked")
325 .remove(&id);
326
327 if let Some((_prev_writer, prev_task, shutdown_tx)) = prev {
329 let _ = shutdown_tx.send(());
333 return Box::pin(prev_task.map(|_| ()));
334 }
335
336 let prev = self
337 .append_only_collections
338 .lock()
339 .expect("CollectionManager panicked")
340 .remove(&id);
341
342 if let Some((_prev_writer, prev_task, shutdown_tx)) = prev {
344 let _ = shutdown_tx.send(());
348 return Box::pin(prev_task.map(|_| ()));
349 }
350
351 Box::pin(futures::future::ready(()))
352 }
353
354 pub(super) fn append_only_write_sender(&self, id: GlobalId) -> AppendOnlyWriteChannel {
359 let collections = self.append_only_collections.lock().expect("poisoned");
360 match collections.get(&id) {
361 Some((tx, _, _)) => tx.clone(),
362 None => panic!("missing append-only collection: {id}"),
363 }
364 }
365
366 pub(super) fn differential_write_sender(&self, id: GlobalId) -> DifferentialWriteChannel {
371 let collections = self.differential_collections.lock().expect("poisoned");
372 match collections.get(&id) {
373 Some((tx, _, _)) => tx.clone(),
374 None => panic!("missing differential collection: {id}"),
375 }
376 }
377
378 pub(super) fn blind_write(&self, id: GlobalId, updates: Vec<AppendOnlyUpdate>) {
386 if self.read_only {
387 panic!("attempting blind write to {} while in read-only mode", id);
388 }
389
390 if updates.is_empty() {
391 return;
392 }
393
394 let collections = self.append_only_collections.lock().expect("poisoned");
395 match collections.get(&id) {
396 Some((update_tx, _, _)) => {
397 let (tx, _rx) = oneshot::channel();
398 update_tx.send((updates, tx)).expect("rx hung up");
399 }
400 None => panic!("missing append-only collection: {id}"),
401 }
402 }
403
404 pub(super) fn differential_write(&self, id: GlobalId, op: StorageWriteOp) {
412 if op.is_empty_append() {
413 return;
414 }
415
416 let collections = self.differential_collections.lock().expect("poisoned");
417 match collections.get(&id) {
418 Some((update_tx, _, _)) => {
419 let (tx, _rx) = oneshot::channel();
420 update_tx.send((op, tx)).expect("rx hung up");
421 }
422 None => panic!("missing differential collection: {id}"),
423 }
424 }
425
426 pub(super) fn differential_append(&self, id: GlobalId, updates: Vec<(Row, Diff)>) {
432 self.differential_write(id, StorageWriteOp::Append { updates })
433 }
434
435 pub(super) fn monotonic_appender(
438 &self,
439 id: GlobalId,
440 ) -> Result<MonotonicAppender, StorageError> {
441 let guard = self
442 .append_only_collections
443 .lock()
444 .expect("CollectionManager panicked");
445 let tx = guard
446 .get(&id)
447 .map(|(tx, _, _)| tx.clone())
448 .ok_or(StorageError::IdentifierMissing(id))?;
449
450 Ok(MonotonicAppender::new(tx))
451 }
452
453 fn get_read_only(&self, id: GlobalId, force_writable: bool) -> bool {
454 if force_writable {
455 assert!(id.is_system(), "unexpected non-system global id: {id:?}");
456 false
457 } else {
458 self.read_only
459 }
460 }
461}
462
463pub(crate) struct DifferentialIntrospectionConfig {
464 pub(crate) recent_upper: Antichain<Timestamp>,
465 pub(crate) introspection_type: IntrospectionType,
466 pub(crate) storage_collections: Arc<dyn StorageCollections + Send + Sync>,
467 pub(crate) collection_manager: collection_mgmt::CollectionManager,
468 pub(crate) source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
469 pub(crate) sink_statistics:
470 Arc<Mutex<BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSinkStatistics>>>,
471 pub(crate) statistics_interval: Duration,
472 pub(crate) statistics_interval_receiver: watch::Receiver<Duration>,
473 pub(crate) statistics_retention_duration: Duration,
474 pub(crate) metrics: StorageControllerMetrics,
475 pub(crate) introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
476}
477
478struct DifferentialWriteTask<R>
485where
486 R: FnMut() -> Pin<
487 Box<dyn Future<Output = ReadHandle<SourceData, (), Timestamp, StorageDiff>> + Send>,
488 > + Send
489 + 'static,
490{
491 id: GlobalId,
493
494 write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
495
496 read_handle_fn: R,
498
499 read_only: bool,
500
501 now: NowFn,
502
503 upper_tick_interval: tokio::time::Interval,
507
508 cmd_rx: mpsc::UnboundedReceiver<(StorageWriteOp, oneshot::Sender<Result<(), StorageError>>)>,
510
511 shutdown_rx: oneshot::Receiver<()>,
513
514 desired: Vec<(Row, Diff)>,
526
527 to_write: Vec<(Row, Diff)>,
530
531 current_upper: Timestamp,
536}
537
538impl<R> DifferentialWriteTask<R>
539where
540 R: FnMut() -> Pin<
541 Box<dyn Future<Output = ReadHandle<SourceData, (), Timestamp, StorageDiff>> + Send>,
542 > + Send
543 + Sync
544 + 'static,
545{
546 fn spawn(
549 id: GlobalId,
550 write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
551 read_handle_fn: R,
552 read_only: bool,
553 now: NowFn,
554 introspection_config: DifferentialIntrospectionConfig,
555 ) -> (DifferentialWriteChannel, WriteTask, ShutdownSender) {
556 let (tx, rx) = mpsc::unbounded_channel();
557 let (shutdown_tx, shutdown_rx) = oneshot::channel();
558
559 let upper_tick_interval = tokio::time::interval(Duration::from_millis(DEFAULT_TICK_MS));
560
561 let task = Self {
562 id,
563 write_handle,
564 read_handle_fn,
565 read_only,
566 now,
567 upper_tick_interval,
568 cmd_rx: rx,
569 shutdown_rx,
570 desired: Vec::new(),
571 to_write: Vec::new(),
572 current_upper: Timestamp::MIN,
573 };
574
575 let handle = mz_ore::task::spawn(
576 || format!("CollectionManager-differential_write_task-{id}"),
577 async move {
578 if !task.read_only {
579 task.prepare(introspection_config).await;
580 }
581 let res = task.run().await;
582
583 match res {
584 ControlFlow::Break(reason) => {
585 info!("write_task-{} ending: {}", id, reason);
586 }
587 c @ ControlFlow::Continue(_) => {
588 unreachable!(
589 "cannot break out of the loop with a Continue, but got: {:?}",
590 c
591 );
592 }
593 }
594 },
595 );
596
597 (tx, handle.abort_on_drop(), shutdown_tx)
598 }
599
600 async fn prepare(&self, introspection_config: DifferentialIntrospectionConfig) {
606 tracing::info!(%self.id, ?introspection_config.introspection_type, "preparing differential introspection collection for writes");
607
608 match introspection_config.introspection_type {
609 IntrospectionType::ShardMapping => {
610 }
612 IntrospectionType::Frontiers | IntrospectionType::ReplicaFrontiers => {
613 }
616 IntrospectionType::StorageSourceStatistics => {
617 let prev = snapshot_statistics(
618 self.id,
619 introspection_config.recent_upper,
620 &introspection_config.storage_collections,
621 )
622 .await;
623
624 let scraper_token = statistics::spawn_statistics_scraper(
625 self.id.clone(),
626 introspection_config.collection_manager,
628 Arc::clone(&introspection_config.source_statistics),
629 prev,
630 introspection_config.statistics_interval.clone(),
631 introspection_config.statistics_interval_receiver.clone(),
632 introspection_config.statistics_retention_duration,
633 introspection_config.metrics,
634 );
635 let web_token = statistics::spawn_webhook_statistics_scraper(
636 introspection_config.source_statistics,
637 introspection_config.statistics_interval,
638 introspection_config.statistics_interval_receiver,
639 );
640
641 introspection_config
644 .introspection_tokens
645 .lock()
646 .expect("poisoned")
647 .insert(self.id, Box::new((scraper_token, web_token)));
648 }
649 IntrospectionType::StorageSinkStatistics => {
650 let prev = snapshot_statistics(
651 self.id,
652 introspection_config.recent_upper,
653 &introspection_config.storage_collections,
654 )
655 .await;
656
657 let scraper_token = statistics::spawn_statistics_scraper(
658 self.id.clone(),
659 introspection_config.collection_manager,
660 Arc::clone(&introspection_config.sink_statistics),
661 prev,
662 introspection_config.statistics_interval,
663 introspection_config.statistics_interval_receiver,
664 introspection_config.statistics_retention_duration,
665 introspection_config.metrics,
666 );
667
668 introspection_config
671 .introspection_tokens
672 .lock()
673 .expect("poisoned")
674 .insert(self.id, scraper_token);
675 }
676
677 IntrospectionType::ComputeDependencies
678 | IntrospectionType::ComputeOperatorHydrationStatus
679 | IntrospectionType::ComputeMaterializedViewRefreshes
680 | IntrospectionType::ComputeErrorCounts
681 | IntrospectionType::ComputeHydrationTimes
682 | IntrospectionType::ComputeObjectArrangementSizes => {
683 }
686
687 introspection_type @ IntrospectionType::ReplicaMetricsHistory
688 | introspection_type @ IntrospectionType::WallclockLagHistory
689 | introspection_type @ IntrospectionType::WallclockLagHistogram
690 | introspection_type @ IntrospectionType::PreparedStatementHistory
691 | introspection_type @ IntrospectionType::StatementExecutionHistory
692 | introspection_type @ IntrospectionType::SessionHistory
693 | introspection_type @ IntrospectionType::StatementLifecycleHistory
694 | introspection_type @ IntrospectionType::SqlText
695 | introspection_type @ IntrospectionType::SourceStatusHistory
696 | introspection_type @ IntrospectionType::SinkStatusHistory
697 | introspection_type @ IntrospectionType::PrivatelinkConnectionStatusHistory
698 | introspection_type @ IntrospectionType::ReplicaStatusHistory => {
699 unreachable!("not differential collection: {introspection_type:?}")
700 }
701 }
702 }
703
704 async fn run(mut self) -> ControlFlow<String> {
705 let mut updates = Vec::new();
706 loop {
707 tokio::select! {
708 biased;
711
712 _ = &mut self.shutdown_rx => {
714 self.handle_shutdown();
715
716 return ControlFlow::Break("graceful shutdown".to_string());
717 }
718
719 () = recv_all_commands(&mut self.cmd_rx, &mut updates) => {
721 if updates.is_empty() {
722 return ControlFlow::Break("sender has been dropped".to_string());
726 }
727 self.handle_updates(&mut updates).await?;
728 }
729
730 _ = self.upper_tick_interval.tick() => {
732 if self.read_only {
733 continue;
735 }
736 self.tick_upper().await?;
737 },
738 }
739 }
740 }
741
742 async fn tick_upper(&mut self) -> ControlFlow<String> {
743 let now = Timestamp::from((self.now)());
744
745 if now <= self.current_upper {
746 return ControlFlow::Continue(());
749 }
750
751 assert!(!self.read_only);
752 let res = self
753 .write_handle
754 .compare_and_append_batch(
755 &mut [],
756 Antichain::from_elem(self.current_upper),
757 Antichain::from_elem(now),
758 true,
759 )
760 .await
761 .expect("valid usage");
762 match res {
763 Ok(()) => {
765 tracing::debug!(%self.id, "bumped upper of differential collection");
766 self.current_upper = now;
767 }
768 Err(err) => {
769 let actual_upper = if let Some(ts) = err.current.as_option() {
774 *ts
775 } else {
776 return ControlFlow::Break("upper is the empty antichain".to_string());
777 };
778
779 tracing::info!(%self.id, ?actual_upper, expected_upper = ?self.current_upper, "upper mismatch while bumping upper, syncing to persist state");
780
781 self.current_upper = actual_upper;
782
783 self.sync_to_persist().await;
784 }
785 }
786
787 ControlFlow::Continue(())
788 }
789
790 fn handle_shutdown(&mut self) {
791 let mut senders = Vec::new();
792
793 self.cmd_rx.close();
795
796 while let Ok((_batch, sender)) = self.cmd_rx.try_recv() {
798 senders.push(sender);
799 }
800
801 notify_listeners(senders, || Err(StorageError::IdentifierInvalid(self.id)));
807 }
808
809 async fn handle_updates(
810 &mut self,
811 batch: &mut Vec<(StorageWriteOp, oneshot::Sender<Result<(), StorageError>>)>,
812 ) -> ControlFlow<String> {
813 let batch_duration_ms = STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT;
815
816 let use_batch_now = Instant::now();
817 let min_time_to_complete = use_batch_now + batch_duration_ms;
818
819 tracing::debug!(
820 ?use_batch_now,
821 ?batch_duration_ms,
822 ?min_time_to_complete,
823 "batch duration",
824 );
825
826 let mut responders = Vec::with_capacity(batch.len());
827 for (op, tx) in batch.drain(..) {
828 self.apply_write_op(op);
829 responders.push(tx);
830 }
831
832 consolidation::consolidate(&mut self.desired);
834 consolidation::consolidate(&mut self.to_write);
835
836 self.upper_tick_interval.reset();
847
848 self.write_to_persist(responders).await?;
849
850 tokio::time::sleep_until(min_time_to_complete).await;
855
856 ControlFlow::Continue(())
857 }
858
859 fn apply_write_op(&mut self, op: StorageWriteOp) {
861 match op {
862 StorageWriteOp::Append { updates } => {
863 self.desired.extend_from_slice(&updates);
864 self.to_write.extend(updates);
865 }
866 StorageWriteOp::Delete { filter } => {
867 let to_delete = self.desired.extract_if(.., |(row, _)| filter(row));
868 let retractions = to_delete.map(|(row, diff)| (row, -diff));
869 self.to_write.extend(retractions);
870 }
871 }
872 }
873
874 async fn write_to_persist(
878 &mut self,
879 responders: Vec<oneshot::Sender<Result<(), StorageError>>>,
880 ) -> ControlFlow<String> {
881 if self.read_only {
882 tracing::debug!(%self.id, "not writing to differential collection: read-only");
883 return ControlFlow::Continue(());
885 }
886
887 let retries = Retry::default()
894 .initial_backoff(Duration::from_secs(1))
895 .clamp_backoff(Duration::from_secs(3))
896 .factor(1.25)
897 .max_tries(20)
898 .into_retry_stream();
899 let mut retries = Box::pin(retries);
900
901 loop {
902 let now = Timestamp::from((self.now)());
904 let new_upper = std::cmp::max(now, self.current_upper.step_forward());
905
906 let updates_to_write = self
907 .to_write
908 .iter()
909 .map(|(row, diff)| {
910 (
911 (SourceData(Ok(row.clone())), ()),
912 self.current_upper,
913 diff.into_inner(),
914 )
915 })
916 .collect::<Vec<_>>();
917
918 assert!(!self.read_only);
919 let res = self
920 .write_handle
921 .compare_and_append(
922 updates_to_write,
923 Antichain::from_elem(self.current_upper),
924 Antichain::from_elem(new_upper),
925 )
926 .await
927 .expect("valid usage");
928 match res {
929 Ok(()) => {
931 notify_listeners(responders, || Ok(()));
933
934 self.current_upper = new_upper;
935
936 self.to_write.clear();
939
940 tracing::debug!(%self.id, "appended to differential collection");
941
942 break;
944 }
945 Err(err) => {
947 let actual_upper = if let Some(ts) = err.current.as_option() {
951 *ts
952 } else {
953 return ControlFlow::Break("upper is the empty antichain".to_string());
954 };
955
956 tracing::info!(%self.id, ?actual_upper, expected_upper = ?self.current_upper, "retrying append for differential collection");
957
958 if retries.next().await.is_none() {
961 let invalid_upper = InvalidUpper {
962 id: self.id,
963 current_upper: err.current,
964 };
965 notify_listeners(responders, || {
966 Err(StorageError::InvalidUppers(vec![invalid_upper.clone()]))
967 });
968 error!(
969 "exhausted retries when appending to managed collection {}",
970 self.id
971 );
972 break;
973 }
974
975 self.current_upper = actual_upper;
976
977 self.sync_to_persist().await;
978
979 debug!(
980 "Retrying invalid-uppers error while appending to differential collection {}",
981 self.id
982 );
983 }
984 }
985 }
986
987 ControlFlow::Continue(())
988 }
989
990 async fn sync_to_persist(&mut self) {
998 let mut read_handle = (self.read_handle_fn)().await;
999 let as_of = self.current_upper.step_back().unwrap_or(Timestamp::MIN);
1000 let as_of = Antichain::from_elem(as_of);
1001 let snapshot = read_handle.snapshot_and_fetch(as_of).await;
1002
1003 let mut negated_oks = match snapshot {
1004 Ok(contents) => {
1005 let mut snapshot = Vec::with_capacity(contents.len());
1006 for ((data, _), _, diff) in contents {
1007 let row = data.0.unwrap();
1008 snapshot.push((row, -Diff::from(diff)));
1009 }
1010 snapshot
1011 }
1012 Err(e) => panic!("read before since: {e:?}"),
1013 };
1014
1015 self.to_write.clear();
1016 self.to_write.extend(self.desired.iter().cloned());
1017 self.to_write.append(&mut negated_oks);
1018 consolidation::consolidate(&mut self.to_write);
1019 }
1020}
1021
1022pub(crate) struct AppendOnlyIntrospectionConfig {
1023 pub(crate) introspection_type: IntrospectionType,
1024 pub(crate) config_set: Arc<ConfigSet>,
1025 pub(crate) parameters: StorageParameters,
1026 pub(crate) storage_collections: Arc<dyn StorageCollections + Send + Sync>,
1027}
1028
1029struct AppendOnlyWriteTask {
1034 id: GlobalId,
1036 write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1037 read_only: bool,
1038 now: NowFn,
1039 user_batch_duration_ms: Arc<AtomicU64>,
1040 rx: mpsc::UnboundedReceiver<(
1042 Vec<AppendOnlyUpdate>,
1043 oneshot::Sender<Result<(), StorageError>>,
1044 )>,
1045
1046 shutdown_rx: oneshot::Receiver<()>,
1048 previous_statuses: Option<BTreeMap<(GlobalId, Option<ReplicaId>), Status>>,
1050}
1051
1052impl AppendOnlyWriteTask {
1053 fn spawn(
1061 id: GlobalId,
1062 write_handle: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1063 read_only: bool,
1064 now: NowFn,
1065 user_batch_duration_ms: Arc<AtomicU64>,
1066 introspection_config: Option<AppendOnlyIntrospectionConfig>,
1067 ) -> (AppendOnlyWriteChannel, WriteTask, ShutdownSender) {
1068 let (tx, rx) = mpsc::unbounded_channel();
1069 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1070
1071 let previous_statuses: Option<BTreeMap<(GlobalId, Option<ReplicaId>), Status>> =
1072 match introspection_config
1073 .as_ref()
1074 .map(|config| config.introspection_type)
1075 {
1076 Some(IntrospectionType::SourceStatusHistory)
1077 | Some(IntrospectionType::SinkStatusHistory) => Some(BTreeMap::new()),
1078
1079 Some(IntrospectionType::ReplicaMetricsHistory)
1080 | Some(IntrospectionType::WallclockLagHistory)
1081 | Some(IntrospectionType::WallclockLagHistogram)
1082 | Some(IntrospectionType::PrivatelinkConnectionStatusHistory)
1083 | Some(IntrospectionType::ReplicaStatusHistory)
1084 | Some(IntrospectionType::PreparedStatementHistory)
1085 | Some(IntrospectionType::StatementExecutionHistory)
1086 | Some(IntrospectionType::SessionHistory)
1087 | Some(IntrospectionType::StatementLifecycleHistory)
1088 | Some(IntrospectionType::SqlText)
1089 | None => None,
1090
1091 Some(introspection_type @ IntrospectionType::ShardMapping)
1092 | Some(introspection_type @ IntrospectionType::Frontiers)
1093 | Some(introspection_type @ IntrospectionType::ReplicaFrontiers)
1094 | Some(introspection_type @ IntrospectionType::StorageSourceStatistics)
1095 | Some(introspection_type @ IntrospectionType::StorageSinkStatistics)
1096 | Some(introspection_type @ IntrospectionType::ComputeDependencies)
1097 | Some(introspection_type @ IntrospectionType::ComputeOperatorHydrationStatus)
1098 | Some(introspection_type @ IntrospectionType::ComputeMaterializedViewRefreshes)
1099 | Some(introspection_type @ IntrospectionType::ComputeErrorCounts)
1100 | Some(introspection_type @ IntrospectionType::ComputeHydrationTimes)
1101 | Some(introspection_type @ IntrospectionType::ComputeObjectArrangementSizes) => {
1102 unreachable!("not append-only collection: {introspection_type:?}")
1103 }
1104 };
1105
1106 let mut task = Self {
1107 id,
1108 write_handle,
1109 rx,
1110 shutdown_rx,
1111 read_only,
1112 now,
1113 user_batch_duration_ms,
1114 previous_statuses,
1115 };
1116
1117 let handle = mz_ore::task::spawn(
1118 || format!("CollectionManager-append_only_write_task-{id}"),
1119 async move {
1120 if !task.read_only {
1121 task.prepare(introspection_config).await;
1122 }
1123 task.run().await;
1124 },
1125 );
1126
1127 (tx, handle.abort_on_drop(), shutdown_tx)
1128 }
1129
1130 async fn prepare(&mut self, introspection_config: Option<AppendOnlyIntrospectionConfig>) {
1135 let Some(AppendOnlyIntrospectionConfig {
1136 introspection_type,
1137 config_set,
1138 parameters,
1139 storage_collections,
1140 }) = introspection_config
1141 else {
1142 return;
1143 };
1144 let initial_statuses = match introspection_type {
1145 IntrospectionType::ReplicaMetricsHistory
1146 | IntrospectionType::WallclockLagHistory
1147 | IntrospectionType::WallclockLagHistogram => {
1148 let result = partially_truncate_metrics_history(
1149 self.id,
1150 introspection_type,
1151 &mut self.write_handle,
1152 config_set,
1153 self.now.clone(),
1154 storage_collections,
1155 )
1156 .await;
1157 if let Err(error) = result {
1158 soft_panic_or_log!(
1159 "error truncating metrics history: {error} (type={introspection_type:?})"
1160 );
1161 }
1162 Vec::new()
1163 }
1164
1165 IntrospectionType::PrivatelinkConnectionStatusHistory => {
1166 partially_truncate_status_history(
1167 self.id,
1168 IntrospectionType::PrivatelinkConnectionStatusHistory,
1169 &mut self.write_handle,
1170 privatelink_status_history_desc(¶meters),
1171 self.now.clone(),
1172 &storage_collections,
1173 )
1174 .await;
1175 Vec::new()
1176 }
1177 IntrospectionType::ReplicaStatusHistory => {
1178 partially_truncate_status_history(
1179 self.id,
1180 IntrospectionType::ReplicaStatusHistory,
1181 &mut self.write_handle,
1182 replica_status_history_desc(¶meters),
1183 self.now.clone(),
1184 &storage_collections,
1185 )
1186 .await;
1187 Vec::new()
1188 }
1189
1190 IntrospectionType::PreparedStatementHistory
1193 | IntrospectionType::StatementExecutionHistory
1194 | IntrospectionType::SessionHistory
1195 | IntrospectionType::StatementLifecycleHistory
1196 | IntrospectionType::SqlText => {
1197 Vec::new()
1202 }
1203
1204 IntrospectionType::SourceStatusHistory => {
1205 let last_status_per_id = partially_truncate_status_history(
1206 self.id,
1207 IntrospectionType::SourceStatusHistory,
1208 &mut self.write_handle,
1209 source_status_history_desc(¶meters),
1210 self.now.clone(),
1211 &storage_collections,
1212 )
1213 .await;
1214
1215 let status_col = MZ_SOURCE_STATUS_HISTORY_DESC
1216 .get_by_name(&ColumnName::from("status"))
1217 .expect("schema has not changed")
1218 .0;
1219
1220 last_status_per_id
1221 .into_iter()
1222 .map(|(id, row)| {
1223 (
1224 id,
1225 Status::from_str(
1226 row.iter()
1227 .nth(status_col)
1228 .expect("schema has not changed")
1229 .unwrap_str(),
1230 )
1231 .expect("statuses must be uncorrupted"),
1232 )
1233 })
1234 .collect()
1235 }
1236 IntrospectionType::SinkStatusHistory => {
1237 let last_status_per_id = partially_truncate_status_history(
1238 self.id,
1239 IntrospectionType::SinkStatusHistory,
1240 &mut self.write_handle,
1241 sink_status_history_desc(¶meters),
1242 self.now.clone(),
1243 &storage_collections,
1244 )
1245 .await;
1246
1247 let status_col = MZ_SINK_STATUS_HISTORY_DESC
1248 .get_by_name(&ColumnName::from("status"))
1249 .expect("schema has not changed")
1250 .0;
1251
1252 last_status_per_id
1253 .into_iter()
1254 .map(|(id, row)| {
1255 (
1256 id,
1257 Status::from_str(
1258 row.iter()
1259 .nth(status_col)
1260 .expect("schema has not changed")
1261 .unwrap_str(),
1262 )
1263 .expect("statuses must be uncorrupted"),
1264 )
1265 })
1266 .collect()
1267 }
1268
1269 introspection_type @ IntrospectionType::ShardMapping
1270 | introspection_type @ IntrospectionType::Frontiers
1271 | introspection_type @ IntrospectionType::ReplicaFrontiers
1272 | introspection_type @ IntrospectionType::StorageSourceStatistics
1273 | introspection_type @ IntrospectionType::StorageSinkStatistics
1274 | introspection_type @ IntrospectionType::ComputeDependencies
1275 | introspection_type @ IntrospectionType::ComputeOperatorHydrationStatus
1276 | introspection_type @ IntrospectionType::ComputeMaterializedViewRefreshes
1277 | introspection_type @ IntrospectionType::ComputeErrorCounts
1278 | introspection_type @ IntrospectionType::ComputeHydrationTimes
1279 | introspection_type @ IntrospectionType::ComputeObjectArrangementSizes => {
1280 unreachable!("not append-only collection: {introspection_type:?}")
1281 }
1282 };
1283 if let Some(previous_statuses) = &mut self.previous_statuses {
1284 previous_statuses.extend(initial_statuses);
1285 }
1286 }
1287
1288 async fn run(mut self) {
1289 let mut interval = tokio::time::interval(Duration::from_millis(DEFAULT_TICK_MS));
1290
1291 let mut batch: Vec<(Vec<_>, _)> = Vec::new();
1292
1293 'run: loop {
1294 tokio::select! {
1295 biased;
1298
1299 _ = &mut self.shutdown_rx => {
1301 let mut senders = Vec::new();
1302
1303 self.rx.close();
1305
1306 while let Ok((_batch, sender)) = self.rx.try_recv() {
1308 senders.push(sender);
1309 }
1310
1311 notify_listeners(senders, || Err(StorageError::IdentifierInvalid(self.id)));
1317
1318 break 'run;
1319 }
1320
1321 () = recv_all_commands(&mut self.rx, &mut batch) => {
1323 if batch.is_empty() {
1324 break 'run;
1328 }
1329
1330 let batch_duration_ms = match self.id {
1333 GlobalId::User(_) => Duration::from_millis(
1334 self.user_batch_duration_ms.load(Ordering::Relaxed),
1335 ),
1336 _ => STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT,
1338 };
1339 let use_batch_now = Instant::now();
1340 let min_time_to_complete = use_batch_now + batch_duration_ms;
1341
1342 tracing::debug!(
1343 ?use_batch_now,
1344 ?batch_duration_ms,
1345 ?min_time_to_complete,
1346 "batch duration",
1347 );
1348
1349 interval.reset();
1358
1359 let capacity: usize = batch
1360 .iter()
1361 .map(|(rows, _)| rows.len())
1362 .sum();
1363 let mut all_rows = Vec::with_capacity(capacity);
1364 let mut responders = Vec::with_capacity(batch.len());
1365
1366 for (updates, responder) in batch.drain(..) {
1367 let rows = self.process_updates(updates);
1368
1369 all_rows.extend(
1370 rows.map(|(row, diff)| TimestamplessUpdate { row, diff }),
1371 );
1372 responders.push(responder);
1373 }
1374
1375 if self.read_only {
1376 tracing::warn!(%self.id, ?all_rows, "append while in read-only mode");
1377 notify_listeners(responders, || Err(StorageError::ReadOnly));
1378 continue;
1379 }
1380
1381 let at_least = Timestamp::from((self.now)());
1383
1384 if !all_rows.is_empty() {
1385 monotonic_append(&mut self.write_handle, all_rows, at_least).await;
1386 }
1387 notify_listeners(responders, || Ok(()));
1389
1390 tokio::time::sleep_until(min_time_to_complete).await;
1395 }
1396
1397 _ = interval.tick() => {
1399 if self.read_only {
1400 continue;
1402 }
1403
1404 let now = Timestamp::from((self.now)());
1406 let updates = vec![];
1407 let at_least = now;
1408
1409 monotonic_append(&mut self.write_handle, updates, at_least).await;
1414 },
1415 }
1416 }
1417
1418 info!("write_task-{} ending", self.id);
1419 }
1420
1421 fn process_updates(
1424 &mut self,
1425 updates: Vec<AppendOnlyUpdate>,
1426 ) -> impl Iterator<Item = (Row, Diff)> {
1427 let updates = if let Some(previous_statuses) = &mut self.previous_statuses {
1428 let new: Vec<_> = updates
1429 .into_iter()
1430 .filter(|r| match r {
1431 AppendOnlyUpdate::Row(_) => true,
1432 AppendOnlyUpdate::Status(update) => {
1433 match (
1434 previous_statuses
1435 .get(&(update.id, update.replica_id))
1436 .as_deref(),
1437 &update.status,
1438 ) {
1439 (None, _) => true,
1440 (Some(old), new) => old.superseded_by(*new),
1441 }
1442 }
1443 })
1444 .collect();
1445 previous_statuses.extend(new.iter().filter_map(|update| match update {
1446 AppendOnlyUpdate::Row(_) => None,
1447 AppendOnlyUpdate::Status(update) => {
1448 Some(((update.id, update.replica_id), update.status))
1449 }
1450 }));
1451 new
1452 } else {
1453 updates
1454 };
1455
1456 updates.into_iter().map(AppendOnlyUpdate::into_row)
1457 }
1458}
1459
1460async fn partially_truncate_metrics_history(
1467 id: GlobalId,
1468 introspection_type: IntrospectionType,
1469 write_handle: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1470 config_set: Arc<ConfigSet>,
1471 now: NowFn,
1472 storage_collections: Arc<dyn StorageCollections + Send + Sync>,
1473) -> Result<(), anyhow::Error> {
1474 let (keep_duration, occurred_at_col) = match introspection_type {
1475 IntrospectionType::ReplicaMetricsHistory => (
1476 REPLICA_METRICS_HISTORY_RETENTION_INTERVAL.get(&config_set),
1477 REPLICA_METRICS_HISTORY_DESC
1478 .get_by_name(&ColumnName::from("occurred_at"))
1479 .expect("schema has not changed")
1480 .0,
1481 ),
1482 IntrospectionType::WallclockLagHistory => (
1483 WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL.get(&config_set),
1484 WALLCLOCK_LAG_HISTORY_DESC
1485 .get_by_name(&ColumnName::from("occurred_at"))
1486 .expect("schema has not changed")
1487 .0,
1488 ),
1489 IntrospectionType::WallclockLagHistogram => (
1490 WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL.get(&config_set),
1491 WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC
1492 .get_by_name(&ColumnName::from("period_start"))
1493 .expect("schema has not changed")
1494 .0,
1495 ),
1496 _ => panic!("not a metrics history: {introspection_type:?}"),
1497 };
1498
1499 let upper = write_handle.fetch_recent_upper().await;
1500 let Some(upper_ts) = upper.as_option() else {
1501 bail!("collection is sealed");
1502 };
1503 let Some(as_of_ts) = upper_ts.step_back() else {
1504 return Ok(()); };
1506
1507 let mut rows = storage_collections
1508 .snapshot_cursor(id, as_of_ts)
1509 .await
1510 .map_err(|e| anyhow!("reading snapshot: {e:?}"))?;
1511
1512 let now = mz_ore::now::to_datetime(now());
1513 let keep_since = now - keep_duration;
1514
1515 let old_upper_ts = *upper_ts;
1520 let new_upper_ts = old_upper_ts.step_forward();
1521
1522 let mut builder = write_handle.builder(Antichain::from_elem(old_upper_ts));
1524 let mut datum_vec = DatumVec::new();
1526 while let Some(chunk) = rows.next().await {
1527 for (data, _t, diff) in chunk {
1528 let Ok(row) = &data.0 else { continue };
1529 let datums = datum_vec.borrow_with(row);
1530 let occurred_at = datums[occurred_at_col].unwrap_timestamptz();
1531 if *occurred_at >= keep_since {
1532 continue;
1533 }
1534 let diff = -diff;
1535 match builder.add(&data, &(), &old_upper_ts, &diff).await? {
1536 Added::Record => {}
1537 Added::RecordAndParts => {
1538 debug!(?id, "added part to builder");
1539 }
1540 }
1541 }
1542 }
1543
1544 let mut updates = builder.finish(Antichain::from_elem(new_upper_ts)).await?;
1545 let mut batches = vec![&mut updates];
1546
1547 write_handle
1548 .compare_and_append_batch(
1549 batches.as_mut_slice(),
1550 Antichain::from_elem(old_upper_ts),
1551 Antichain::from_elem(new_upper_ts),
1552 true,
1553 )
1554 .await
1555 .expect("valid usage")
1556 .map_err(|e| anyhow!("appending retractions: {e:?}"))
1557}
1558
1559pub(crate) async fn partially_truncate_status_history<K>(
1569 id: GlobalId,
1570 introspection_type: IntrospectionType,
1571 write_handle: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1572 status_history_desc: StatusHistoryDesc<K>,
1573 now: NowFn,
1574 storage_collections: &Arc<dyn StorageCollections + Send + Sync>,
1575) -> BTreeMap<K, Row>
1576where
1577 K: Clone + Debug + Ord + Send + Sync,
1578{
1579 let upper = write_handle.fetch_recent_upper().await.clone();
1580
1581 let mut rows = match upper.as_option() {
1582 Some(f) if f > &Timestamp::MIN => {
1583 let as_of = f.step_back().unwrap();
1584
1585 storage_collections
1586 .snapshot_cursor(id, as_of)
1587 .await
1588 .expect("snapshot succeeds")
1589 }
1590 _ => return BTreeMap::new(),
1593 };
1594
1595 let mut latest_row_per_key: BTreeMap<K, (CheckedTimestamp<DateTime<Utc>>, Row)> =
1597 BTreeMap::new();
1598
1599 let expected_upper = upper.into_option().expect("checked above");
1604 let new_upper = expected_upper.step_forward();
1605
1606 let mut deletions = write_handle.builder(Antichain::from_elem(expected_upper));
1607
1608 let mut handle_row = {
1609 let latest_row_per_key = &mut latest_row_per_key;
1610 let mut datum_vec = DatumVec::new();
1612 move |row: &Row, diff| {
1613 let datums = datum_vec.borrow_with(row);
1614 let key = (status_history_desc.extract_key)(&datums);
1615 let timestamp = (status_history_desc.extract_time)(&datums);
1616
1617 assert!(
1618 diff > 0,
1619 "only know how to operate over consolidated data with diffs > 0, \
1620 found diff {diff} for object {key:?} in {introspection_type:?}",
1621 );
1622
1623 match latest_row_per_key.get(&key) {
1625 Some(existing) if &existing.0 > ×tamp => {}
1626 _ => {
1627 latest_row_per_key.insert(key.clone(), (timestamp, row.clone()));
1628 }
1629 };
1630 (key, timestamp)
1631 }
1632 };
1633
1634 match status_history_desc.retention_policy {
1635 StatusHistoryRetentionPolicy::LastN(n) => {
1636 let mut last_n_entries_per_key: BTreeMap<
1638 K,
1639 BinaryHeap<Reverse<(CheckedTimestamp<DateTime<Utc>>, Row)>>,
1640 > = BTreeMap::new();
1641
1642 while let Some(chunk) = rows.next().await {
1643 for (data, _t, diff) in chunk {
1644 let Ok(row) = &data.0 else { continue };
1645 let (key, timestamp) = handle_row(row, diff);
1646
1647 let entries = last_n_entries_per_key.entry(key).or_default();
1650 for _ in 0..diff {
1651 entries.push(Reverse((timestamp, row.clone())));
1661
1662 while entries.len() > n {
1665 if let Some(Reverse((_, r))) = entries.pop() {
1666 deletions
1667 .add(&SourceData(Ok(r)), &(), &expected_upper, &-1)
1668 .await
1669 .expect("usage should be valid");
1670 }
1671 }
1672 }
1673 }
1674 }
1675 }
1676 StatusHistoryRetentionPolicy::TimeWindow(time_window) => {
1677 let now = mz_ore::now::to_datetime(now());
1679 let keep_since = now - time_window;
1680
1681 while let Some(chunk) = rows.next().await {
1683 for (data, _t, diff) in chunk {
1684 let Ok(row) = &data.0 else { continue };
1685 let (_, timestamp) = handle_row(row, diff);
1686
1687 if *timestamp < keep_since {
1688 deletions
1689 .add(&data, &(), &expected_upper, &-1)
1690 .await
1691 .expect("usage should be valid");
1692 }
1693 }
1694 }
1695 }
1696 }
1697
1698 let mut updates = deletions
1699 .finish(Antichain::from_elem(new_upper))
1700 .await
1701 .expect("expected valid usage");
1702 let mut batches = vec![&mut updates];
1703
1704 let res = write_handle
1706 .compare_and_append_batch(
1707 batches.as_mut_slice(),
1708 Antichain::from_elem(expected_upper),
1709 Antichain::from_elem(new_upper),
1710 true,
1711 )
1712 .await
1713 .expect("usage was valid");
1714
1715 match res {
1716 Ok(_) => {
1717 }
1719 Err(err) => {
1720 info!(
1728 %id, ?expected_upper, current_upper = ?err.current,
1729 "failed to append partial truncation",
1730 );
1731 }
1732 }
1733
1734 latest_row_per_key
1735 .into_iter()
1736 .map(|(key, (_, row))| (key, row))
1737 .collect()
1738}
1739
1740async fn monotonic_append(
1741 write_handle: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1742 updates: Vec<TimestamplessUpdate>,
1743 at_least: Timestamp,
1744) {
1745 let mut expected_upper = write_handle.shared_upper();
1746 loop {
1747 if updates.is_empty() && expected_upper.is_empty() {
1748 return;
1752 }
1753
1754 let upper = expected_upper
1755 .into_option()
1756 .expect("cannot append data to closed collection");
1757
1758 let lower = std::cmp::max(upper, at_least);
1759 let new_upper = lower.step_forward();
1760 let updates = updates
1761 .iter()
1762 .map(|TimestamplessUpdate { row, diff }| {
1763 ((SourceData(Ok(row.clone())), ()), lower, diff.into_inner())
1764 })
1765 .collect::<Vec<_>>();
1766 let res = write_handle
1767 .compare_and_append(
1768 updates,
1769 Antichain::from_elem(upper),
1770 Antichain::from_elem(new_upper),
1771 )
1772 .await
1773 .expect("valid usage");
1774 match res {
1775 Ok(()) => return,
1776 Err(err) => {
1777 expected_upper = err.current;
1778 continue;
1779 }
1780 }
1781 }
1782}
1783
1784fn notify_listeners<T>(
1786 responders: impl IntoIterator<Item = oneshot::Sender<T>>,
1787 result: impl Fn() -> T,
1788) {
1789 for r in responders {
1790 let _ = r.send(result());
1792 }
1793}
1794
1795async fn recv_all_commands<T>(rx: &mut mpsc::UnboundedReceiver<T>, out: &mut Vec<T>) {
1808 if let Some(msg) = rx.recv().await {
1809 out.push(msg);
1810 } else {
1811 return; };
1813
1814 out.reserve(rx.len());
1815 while let Ok(msg) = rx.try_recv() {
1816 out.push(msg);
1817 }
1818
1819 if out.capacity() > out.len() * 4 {
1824 out.shrink_to_fit();
1825 }
1826}
1827
1828#[cfg(test)]
1829mod tests {
1830 use std::collections::BTreeSet;
1831
1832 use super::*;
1833 use itertools::Itertools;
1834 use mz_repr::{Datum, Row};
1835 use mz_storage_client::client::StatusUpdate;
1836 use mz_storage_client::healthcheck::{
1837 MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC,
1838 };
1839
1840 #[mz_ore::test]
1841 fn test_row() {
1842 let error_message = "error message";
1843 let hint = "hint message";
1844 let id = GlobalId::User(1);
1845 let status = Status::Dropped;
1846 let row = Row::from(StatusUpdate {
1847 id,
1848 timestamp: chrono::offset::Utc::now(),
1849 status,
1850 error: Some(error_message.to_string()),
1851 hints: BTreeSet::from([hint.to_string()]),
1852 namespaced_errors: Default::default(),
1853 replica_id: None,
1854 });
1855
1856 for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1857 assert!(datum.is_instance_of_sql(column_type));
1858 }
1859
1860 for (datum, column_type) in row
1861 .iter()
1862 .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
1863 {
1864 assert!(datum.is_instance_of_sql(column_type));
1865 }
1866
1867 assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1868 assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1869 assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
1870
1871 let details = row
1872 .iter()
1873 .nth(4)
1874 .unwrap()
1875 .unwrap_map()
1876 .iter()
1877 .collect::<Vec<_>>();
1878
1879 assert_eq!(details.len(), 1);
1880 let hint_datum = &details[0];
1881
1882 assert_eq!(hint_datum.0, "hints");
1883 assert_eq!(
1884 hint_datum.1.unwrap_list().iter().next().unwrap(),
1885 Datum::String(hint)
1886 );
1887 }
1888
1889 #[mz_ore::test]
1890 fn test_row_without_hint() {
1891 let error_message = "error message";
1892 let id = GlobalId::User(1);
1893 let status = Status::Dropped;
1894 let row = Row::from(StatusUpdate {
1895 id,
1896 timestamp: chrono::offset::Utc::now(),
1897 status,
1898 error: Some(error_message.to_string()),
1899 hints: Default::default(),
1900 namespaced_errors: Default::default(),
1901 replica_id: None,
1902 });
1903
1904 for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1905 assert!(datum.is_instance_of_sql(column_type));
1906 }
1907
1908 for (datum, column_type) in row
1909 .iter()
1910 .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
1911 {
1912 assert!(datum.is_instance_of_sql(column_type));
1913 }
1914
1915 assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1916 assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1917 assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
1918 assert_eq!(row.iter().nth(4).unwrap(), Datum::Null);
1919 }
1920
1921 #[mz_ore::test]
1922 fn test_row_without_error() {
1923 let id = GlobalId::User(1);
1924 let status = Status::Dropped;
1925 let hint = "hint message";
1926 let row = Row::from(StatusUpdate {
1927 id,
1928 timestamp: chrono::offset::Utc::now(),
1929 status,
1930 error: None,
1931 hints: BTreeSet::from([hint.to_string()]),
1932 namespaced_errors: Default::default(),
1933 replica_id: None,
1934 });
1935
1936 for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1937 assert!(datum.is_instance_of_sql(column_type));
1938 }
1939
1940 for (datum, column_type) in row
1941 .iter()
1942 .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
1943 {
1944 assert!(datum.is_instance_of_sql(column_type));
1945 }
1946
1947 assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1948 assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1949 assert_eq!(row.iter().nth(3).unwrap(), Datum::Null);
1950
1951 let details = row
1952 .iter()
1953 .nth(4)
1954 .unwrap()
1955 .unwrap_map()
1956 .iter()
1957 .collect::<Vec<_>>();
1958
1959 assert_eq!(details.len(), 1);
1960 let hint_datum = &details[0];
1961
1962 assert_eq!(hint_datum.0, "hints");
1963 assert_eq!(
1964 hint_datum.1.unwrap_list().iter().next().unwrap(),
1965 Datum::String(hint)
1966 );
1967 }
1968
1969 #[mz_ore::test]
1970 fn test_row_with_namespaced() {
1971 let error_message = "error message";
1972 let id = GlobalId::User(1);
1973 let status = Status::Dropped;
1974 let row = Row::from(StatusUpdate {
1975 id,
1976 timestamp: chrono::offset::Utc::now(),
1977 status,
1978 error: Some(error_message.to_string()),
1979 hints: Default::default(),
1980 namespaced_errors: BTreeMap::from([("thing".to_string(), "error".to_string())]),
1981 replica_id: None,
1982 });
1983
1984 for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1985 assert!(datum.is_instance_of_sql(column_type));
1986 }
1987
1988 for (datum, column_type) in row
1989 .iter()
1990 .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
1991 {
1992 assert!(datum.is_instance_of_sql(column_type));
1993 }
1994
1995 assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1996 assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1997 assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
1998
1999 let details = row
2000 .iter()
2001 .nth(4)
2002 .unwrap()
2003 .unwrap_map()
2004 .iter()
2005 .collect::<Vec<_>>();
2006
2007 assert_eq!(details.len(), 1);
2008 let ns_datum = &details[0];
2009
2010 assert_eq!(ns_datum.0, "namespaced");
2011 assert_eq!(
2012 ns_datum.1.unwrap_map().iter().next().unwrap(),
2013 ("thing", Datum::String("error"))
2014 );
2015 }
2016
2017 #[mz_ore::test]
2018 fn test_row_with_everything() {
2019 let error_message = "error message";
2020 let hint = "hint message";
2021 let id = GlobalId::User(1);
2022 let status = Status::Dropped;
2023 let row = Row::from(StatusUpdate {
2024 id,
2025 timestamp: chrono::offset::Utc::now(),
2026 status,
2027 error: Some(error_message.to_string()),
2028 hints: BTreeSet::from([hint.to_string()]),
2029 namespaced_errors: BTreeMap::from([("thing".to_string(), "error".to_string())]),
2030 replica_id: None,
2031 });
2032
2033 for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
2034 assert!(datum.is_instance_of_sql(column_type));
2035 }
2036
2037 for (datum, column_type) in row
2038 .iter()
2039 .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
2040 {
2041 assert!(datum.is_instance_of_sql(column_type));
2042 }
2043
2044 assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
2045 assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
2046 assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
2047
2048 let details = row
2049 .iter()
2050 .nth(4)
2051 .unwrap()
2052 .unwrap_map()
2053 .iter()
2054 .collect::<Vec<_>>();
2055
2056 assert_eq!(details.len(), 2);
2057 let hint_datum = &details[0];
2059 let ns_datum = &details[1];
2060
2061 assert_eq!(hint_datum.0, "hints");
2062 assert_eq!(
2063 hint_datum.1.unwrap_list().iter().next().unwrap(),
2064 Datum::String(hint)
2065 );
2066
2067 assert_eq!(ns_datum.0, "namespaced");
2068 assert_eq!(
2069 ns_datum.1.unwrap_map().iter().next().unwrap(),
2070 ("thing", Datum::String("error"))
2071 );
2072 }
2073}