1use std::any::Any;
61use std::cmp::Reverse;
62use std::collections::{BTreeMap, BinaryHeap};
63use std::fmt::Debug;
64use std::ops::ControlFlow;
65use std::pin::Pin;
66use std::str::FromStr;
67use std::sync::atomic::{AtomicU64, Ordering};
68use std::sync::{Arc, Mutex};
69
70use anyhow::{anyhow, bail};
71use chrono::{DateTime, Utc};
72use differential_dataflow::consolidation;
73use differential_dataflow::lattice::Lattice;
74use futures::future::BoxFuture;
75use futures::stream::StreamExt;
76use futures::{Future, FutureExt};
77use mz_cluster_client::ReplicaId;
78use mz_dyncfg::ConfigSet;
79use mz_ore::now::{EpochMillis, NowFn};
80use mz_ore::retry::Retry;
81use mz_ore::soft_panic_or_log;
82use mz_ore::task::AbortOnDropHandle;
83use mz_persist_client::batch::Added;
84use mz_persist_client::read::ReadHandle;
85use mz_persist_client::write::WriteHandle;
86use mz_persist_types::Codec64;
87use mz_repr::adt::timestamp::CheckedTimestamp;
88use mz_repr::{ColumnName, Diff, GlobalId, Row, TimestampManipulation};
89use mz_storage_client::client::{AppendOnlyUpdate, Status, TimestamplessUpdate};
90use mz_storage_client::controller::{IntrospectionType, MonotonicAppender, StorageWriteOp};
91use mz_storage_client::healthcheck::{
92 MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC, REPLICA_METRICS_HISTORY_DESC,
93 WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC, WALLCLOCK_LAG_HISTORY_DESC,
94};
95use mz_storage_client::metrics::StorageControllerMetrics;
96use mz_storage_client::statistics::ControllerSinkStatistics;
97use mz_storage_client::storage_collections::StorageCollections;
98use mz_storage_types::StorageDiff;
99use mz_storage_types::controller::InvalidUpper;
100use mz_storage_types::dyncfgs::{
101 REPLICA_METRICS_HISTORY_RETENTION_INTERVAL, WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL,
102 WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL,
103};
104use mz_storage_types::parameters::{
105 STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT, StorageParameters,
106};
107use mz_storage_types::sources::SourceData;
108use timely::progress::{Antichain, Timestamp};
109use tokio::sync::{mpsc, oneshot, watch};
110use tokio::time::{Duration, Instant};
111use tracing::{debug, error, info};
112
113use crate::{
114 StatusHistoryDesc, StatusHistoryRetentionPolicy, StorageError, collection_mgmt,
115 privatelink_status_history_desc, replica_status_history_desc, sink_status_history_desc,
116 snapshot_statistics, source_status_history_desc, statistics,
117};
118
119const DEFAULT_TICK_MS: u64 = 1_000;
121
122type DifferentialWriteChannel<T> =
124 mpsc::UnboundedSender<(StorageWriteOp, oneshot::Sender<Result<(), StorageError<T>>>)>;
125
126type AppendOnlyWriteChannel<T> = mpsc::UnboundedSender<(
128 Vec<AppendOnlyUpdate>,
129 oneshot::Sender<Result<(), StorageError<T>>>,
130)>;
131
132type WriteTask = AbortOnDropHandle<()>;
133type ShutdownSender = oneshot::Sender<()>;
134
135pub enum CollectionManagerKind {
148 AppendOnly,
149 Differential,
150}
151
152#[derive(Debug, Clone)]
153pub struct CollectionManager<T>
154where
155 T: Timestamp + Lattice + Codec64 + TimestampManipulation,
156{
157 read_only: bool,
160
161 differential_collections:
167 Arc<Mutex<BTreeMap<GlobalId, (DifferentialWriteChannel<T>, WriteTask, ShutdownSender)>>>,
168
169 append_only_collections:
174 Arc<Mutex<BTreeMap<GlobalId, (AppendOnlyWriteChannel<T>, WriteTask, ShutdownSender)>>>,
175
176 user_batch_duration_ms: Arc<AtomicU64>,
179 now: NowFn,
180}
181
182impl<T> CollectionManager<T>
192where
193 T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
194{
195 pub(super) fn new(read_only: bool, now: NowFn) -> CollectionManager<T> {
196 let batch_duration_ms: u64 = STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT
197 .as_millis()
198 .try_into()
199 .expect("known to fit");
200
201 CollectionManager {
202 read_only,
203 differential_collections: Arc::new(Mutex::new(BTreeMap::new())),
204 append_only_collections: Arc::new(Mutex::new(BTreeMap::new())),
205 user_batch_duration_ms: Arc::new(AtomicU64::new(batch_duration_ms)),
206 now,
207 }
208 }
209
210 pub fn update_user_batch_duration(&self, duration: Duration) {
212 tracing::info!(?duration, "updating user batch duration");
213 let millis: u64 = duration.as_millis().try_into().unwrap_or(u64::MAX);
214 self.user_batch_duration_ms.store(millis, Ordering::Relaxed);
215 }
216
217 pub(super) fn register_differential_collection<R>(
225 &self,
226 id: GlobalId,
227 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
228 read_handle_fn: R,
229 force_writable: bool,
230 introspection_config: DifferentialIntrospectionConfig<T>,
231 ) where
232 R: FnMut() -> Pin<
233 Box<dyn Future<Output = ReadHandle<SourceData, (), T, StorageDiff>> + Send>,
234 > + Send
235 + Sync
236 + 'static,
237 {
238 let mut guard = self
239 .differential_collections
240 .lock()
241 .expect("collection_mgmt panicked");
242
243 if let Some((_writer, task, _shutdown_tx)) = guard.get(&id) {
245 if !task.is_finished() {
247 tracing::error!("Registered a collection twice! {id:?}");
249 return;
250 }
251 }
252
253 let read_only = self.get_read_only(id, force_writable);
254
255 let writer_and_handle = DifferentialWriteTask::spawn(
257 id,
258 write_handle,
259 read_handle_fn,
260 read_only,
261 self.now.clone(),
262 introspection_config,
263 );
264 let prev = guard.insert(id, writer_and_handle);
265
266 if let Some((_, prev_task, _)) = prev {
268 assert!(
269 prev_task.is_finished(),
270 "should only spawn a new task if the previous is finished"
271 );
272 }
273 }
274
275 pub(super) fn register_append_only_collection(
280 &self,
281 id: GlobalId,
282 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
283 force_writable: bool,
284 introspection_config: Option<AppendOnlyIntrospectionConfig<T>>,
285 ) {
286 let mut guard = self
287 .append_only_collections
288 .lock()
289 .expect("collection_mgmt panicked");
290
291 if let Some((_writer, task, _shutdown_tx)) = guard.get(&id) {
293 if !task.is_finished() {
295 tracing::error!("Registered a collection twice! {id:?}");
297 return;
298 }
299 }
300
301 let read_only = self.get_read_only(id, force_writable);
302
303 let writer_and_handle = AppendOnlyWriteTask::spawn(
305 id,
306 write_handle,
307 read_only,
308 self.now.clone(),
309 Arc::clone(&self.user_batch_duration_ms),
310 introspection_config,
311 );
312 let prev = guard.insert(id, writer_and_handle);
313
314 if let Some((_, prev_task, _)) = prev {
316 assert!(
317 prev_task.is_finished(),
318 "should only spawn a new task if the previous is finished"
319 );
320 }
321 }
322
323 #[mz_ore::instrument(level = "debug")]
328 pub(super) fn unregister_collection(&self, id: GlobalId) -> BoxFuture<'static, ()> {
329 let prev = self
330 .differential_collections
331 .lock()
332 .expect("CollectionManager panicked")
333 .remove(&id);
334
335 if let Some((_prev_writer, prev_task, shutdown_tx)) = prev {
337 let _ = shutdown_tx.send(());
341 return Box::pin(prev_task.map(|_| ()));
342 }
343
344 let prev = self
345 .append_only_collections
346 .lock()
347 .expect("CollectionManager panicked")
348 .remove(&id);
349
350 if let Some((_prev_writer, prev_task, shutdown_tx)) = prev {
352 let _ = shutdown_tx.send(());
356 return Box::pin(prev_task.map(|_| ()));
357 }
358
359 Box::pin(futures::future::ready(()))
360 }
361
362 pub(super) fn append_only_write_sender(&self, id: GlobalId) -> AppendOnlyWriteChannel<T> {
367 let collections = self.append_only_collections.lock().expect("poisoned");
368 match collections.get(&id) {
369 Some((tx, _, _)) => tx.clone(),
370 None => panic!("missing append-only collection: {id}"),
371 }
372 }
373
374 pub(super) fn differential_write_sender(&self, id: GlobalId) -> DifferentialWriteChannel<T> {
379 let collections = self.differential_collections.lock().expect("poisoned");
380 match collections.get(&id) {
381 Some((tx, _, _)) => tx.clone(),
382 None => panic!("missing differential collection: {id}"),
383 }
384 }
385
386 pub(super) fn blind_write(&self, id: GlobalId, updates: Vec<AppendOnlyUpdate>) {
394 if self.read_only {
395 panic!("attempting blind write to {} while in read-only mode", id);
396 }
397
398 if updates.is_empty() {
399 return;
400 }
401
402 let collections = self.append_only_collections.lock().expect("poisoned");
403 match collections.get(&id) {
404 Some((update_tx, _, _)) => {
405 let (tx, _rx) = oneshot::channel();
406 update_tx.send((updates, tx)).expect("rx hung up");
407 }
408 None => panic!("missing append-only collection: {id}"),
409 }
410 }
411
412 pub(super) fn differential_write(&self, id: GlobalId, op: StorageWriteOp) {
420 if op.is_empty_append() {
421 return;
422 }
423
424 let collections = self.differential_collections.lock().expect("poisoned");
425 match collections.get(&id) {
426 Some((update_tx, _, _)) => {
427 let (tx, _rx) = oneshot::channel();
428 update_tx.send((op, tx)).expect("rx hung up");
429 }
430 None => panic!("missing differential collection: {id}"),
431 }
432 }
433
434 pub(super) fn differential_append(&self, id: GlobalId, updates: Vec<(Row, Diff)>) {
440 self.differential_write(id, StorageWriteOp::Append { updates })
441 }
442
443 pub(super) fn monotonic_appender(
446 &self,
447 id: GlobalId,
448 ) -> Result<MonotonicAppender<T>, StorageError<T>> {
449 let guard = self
450 .append_only_collections
451 .lock()
452 .expect("CollectionManager panicked");
453 let tx = guard
454 .get(&id)
455 .map(|(tx, _, _)| tx.clone())
456 .ok_or(StorageError::IdentifierMissing(id))?;
457
458 Ok(MonotonicAppender::new(tx))
459 }
460
461 fn get_read_only(&self, id: GlobalId, force_writable: bool) -> bool {
462 if force_writable {
463 assert!(id.is_system(), "unexpected non-system global id: {id:?}");
464 false
465 } else {
466 self.read_only
467 }
468 }
469}
470
471pub(crate) struct DifferentialIntrospectionConfig<T>
472where
473 T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
474{
475 pub(crate) recent_upper: Antichain<T>,
476 pub(crate) introspection_type: IntrospectionType,
477 pub(crate) storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
478 pub(crate) collection_manager: collection_mgmt::CollectionManager<T>,
479 pub(crate) source_statistics: Arc<Mutex<statistics::SourceStatistics>>,
480 pub(crate) sink_statistics:
481 Arc<Mutex<BTreeMap<(GlobalId, Option<ReplicaId>), ControllerSinkStatistics>>>,
482 pub(crate) statistics_interval: Duration,
483 pub(crate) statistics_interval_receiver: watch::Receiver<Duration>,
484 pub(crate) statistics_retention_duration: Duration,
485 pub(crate) metrics: StorageControllerMetrics,
486 pub(crate) introspection_tokens: Arc<Mutex<BTreeMap<GlobalId, Box<dyn Any + Send + Sync>>>>,
487}
488
489struct DifferentialWriteTask<T, R>
496where
497 T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
498 R: FnMut() -> Pin<Box<dyn Future<Output = ReadHandle<SourceData, (), T, StorageDiff>> + Send>>
499 + Send
500 + 'static,
501{
502 id: GlobalId,
504
505 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
506
507 read_handle_fn: R,
509
510 read_only: bool,
511
512 now: NowFn,
513
514 upper_tick_interval: tokio::time::Interval,
518
519 cmd_rx: mpsc::UnboundedReceiver<(StorageWriteOp, oneshot::Sender<Result<(), StorageError<T>>>)>,
521
522 shutdown_rx: oneshot::Receiver<()>,
524
525 desired: Vec<(Row, Diff)>,
537
538 to_write: Vec<(Row, Diff)>,
541
542 current_upper: T,
547}
548
549impl<T, R> DifferentialWriteTask<T, R>
550where
551 T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
552 R: FnMut() -> Pin<Box<dyn Future<Output = ReadHandle<SourceData, (), T, StorageDiff>> + Send>>
553 + Send
554 + Sync
555 + 'static,
556{
557 fn spawn(
560 id: GlobalId,
561 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
562 read_handle_fn: R,
563 read_only: bool,
564 now: NowFn,
565 introspection_config: DifferentialIntrospectionConfig<T>,
566 ) -> (DifferentialWriteChannel<T>, WriteTask, ShutdownSender) {
567 let (tx, rx) = mpsc::unbounded_channel();
568 let (shutdown_tx, shutdown_rx) = oneshot::channel();
569
570 let upper_tick_interval = tokio::time::interval(Duration::from_millis(DEFAULT_TICK_MS));
571
572 let current_upper = T::minimum();
573
574 let task = Self {
575 id,
576 write_handle,
577 read_handle_fn,
578 read_only,
579 now,
580 upper_tick_interval,
581 cmd_rx: rx,
582 shutdown_rx,
583 desired: Vec::new(),
584 to_write: Vec::new(),
585 current_upper,
586 };
587
588 let handle = mz_ore::task::spawn(
589 || format!("CollectionManager-differential_write_task-{id}"),
590 async move {
591 if !task.read_only {
592 task.prepare(introspection_config).await;
593 }
594 let res = task.run().await;
595
596 match res {
597 ControlFlow::Break(reason) => {
598 info!("write_task-{} ending: {}", id, reason);
599 }
600 c => {
601 unreachable!(
602 "cannot break out of the loop with a Continue, but got: {:?}",
603 c
604 );
605 }
606 }
607 },
608 );
609
610 (tx, handle.abort_on_drop(), shutdown_tx)
611 }
612
613 async fn prepare(&self, introspection_config: DifferentialIntrospectionConfig<T>) {
619 tracing::info!(%self.id, ?introspection_config.introspection_type, "preparing differential introspection collection for writes");
620
621 match introspection_config.introspection_type {
622 IntrospectionType::ShardMapping => {
623 }
625 IntrospectionType::Frontiers | IntrospectionType::ReplicaFrontiers => {
626 }
629 IntrospectionType::StorageSourceStatistics => {
630 let prev = snapshot_statistics(
631 self.id,
632 introspection_config.recent_upper,
633 &introspection_config.storage_collections,
634 )
635 .await;
636
637 let scraper_token = statistics::spawn_statistics_scraper(
638 self.id.clone(),
639 introspection_config.collection_manager,
641 Arc::clone(&introspection_config.source_statistics),
642 prev,
643 introspection_config.statistics_interval.clone(),
644 introspection_config.statistics_interval_receiver.clone(),
645 introspection_config.statistics_retention_duration,
646 introspection_config.metrics,
647 );
648 let web_token = statistics::spawn_webhook_statistics_scraper(
649 introspection_config.source_statistics,
650 introspection_config.statistics_interval,
651 introspection_config.statistics_interval_receiver,
652 );
653
654 introspection_config
657 .introspection_tokens
658 .lock()
659 .expect("poisoned")
660 .insert(self.id, Box::new((scraper_token, web_token)));
661 }
662 IntrospectionType::StorageSinkStatistics => {
663 let prev = snapshot_statistics(
664 self.id,
665 introspection_config.recent_upper,
666 &introspection_config.storage_collections,
667 )
668 .await;
669
670 let scraper_token = statistics::spawn_statistics_scraper(
671 self.id.clone(),
672 introspection_config.collection_manager,
673 Arc::clone(&introspection_config.sink_statistics),
674 prev,
675 introspection_config.statistics_interval,
676 introspection_config.statistics_interval_receiver,
677 introspection_config.statistics_retention_duration,
678 introspection_config.metrics,
679 );
680
681 introspection_config
684 .introspection_tokens
685 .lock()
686 .expect("poisoned")
687 .insert(self.id, scraper_token);
688 }
689
690 IntrospectionType::ComputeDependencies
691 | IntrospectionType::ComputeOperatorHydrationStatus
692 | IntrospectionType::ComputeMaterializedViewRefreshes
693 | IntrospectionType::ComputeErrorCounts
694 | IntrospectionType::ComputeHydrationTimes => {
695 }
698
699 introspection_type @ IntrospectionType::ReplicaMetricsHistory
700 | introspection_type @ IntrospectionType::WallclockLagHistory
701 | introspection_type @ IntrospectionType::WallclockLagHistogram
702 | introspection_type @ IntrospectionType::PreparedStatementHistory
703 | introspection_type @ IntrospectionType::StatementExecutionHistory
704 | introspection_type @ IntrospectionType::SessionHistory
705 | introspection_type @ IntrospectionType::StatementLifecycleHistory
706 | introspection_type @ IntrospectionType::SqlText
707 | introspection_type @ IntrospectionType::SourceStatusHistory
708 | introspection_type @ IntrospectionType::SinkStatusHistory
709 | introspection_type @ IntrospectionType::PrivatelinkConnectionStatusHistory
710 | introspection_type @ IntrospectionType::ReplicaStatusHistory => {
711 unreachable!("not differential collection: {introspection_type:?}")
712 }
713 }
714 }
715
716 async fn run(mut self) -> ControlFlow<String> {
717 let mut updates = Vec::new();
718 loop {
719 tokio::select! {
720 biased;
723
724 _ = &mut self.shutdown_rx => {
726 self.handle_shutdown();
727
728 return ControlFlow::Break("graceful shutdown".to_string());
729 }
730
731 () = recv_all_commands(&mut self.cmd_rx, &mut updates) => {
733 if updates.is_empty() {
734 return ControlFlow::Break("sender has been dropped".to_string());
738 }
739 let _ = self.handle_updates(&mut updates).await?;
740 }
741
742 _ = self.upper_tick_interval.tick() => {
744 if self.read_only {
745 continue;
747 }
748 let _ = self.tick_upper().await?;
749 },
750 }
751 }
752 }
753
754 async fn tick_upper(&mut self) -> ControlFlow<String> {
755 let now = T::from((self.now)());
756
757 if now <= self.current_upper {
758 return ControlFlow::Continue(());
761 }
762
763 assert!(!self.read_only);
764 let res = self
765 .write_handle
766 .compare_and_append_batch(
767 &mut [],
768 Antichain::from_elem(self.current_upper.clone()),
769 Antichain::from_elem(now.clone()),
770 true,
771 )
772 .await
773 .expect("valid usage");
774 match res {
775 Ok(()) => {
777 tracing::debug!(%self.id, "bumped upper of differential collection");
778 self.current_upper = now;
779 }
780 Err(err) => {
781 let actual_upper = if let Some(ts) = err.current.as_option() {
786 ts.clone()
787 } else {
788 return ControlFlow::Break("upper is the empty antichain".to_string());
789 };
790
791 tracing::info!(%self.id, ?actual_upper, expected_upper = ?self.current_upper, "upper mismatch while bumping upper, syncing to persist state");
792
793 self.current_upper = actual_upper;
794
795 self.sync_to_persist().await;
796 }
797 }
798
799 ControlFlow::Continue(())
800 }
801
802 fn handle_shutdown(&mut self) {
803 let mut senders = Vec::new();
804
805 self.cmd_rx.close();
807
808 while let Ok((_batch, sender)) = self.cmd_rx.try_recv() {
810 senders.push(sender);
811 }
812
813 notify_listeners(senders, || Err(StorageError::IdentifierInvalid(self.id)));
819 }
820
821 async fn handle_updates(
822 &mut self,
823 batch: &mut Vec<(StorageWriteOp, oneshot::Sender<Result<(), StorageError<T>>>)>,
824 ) -> ControlFlow<String> {
825 let batch_duration_ms = STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT;
827
828 let use_batch_now = Instant::now();
829 let min_time_to_complete = use_batch_now + batch_duration_ms;
830
831 tracing::debug!(
832 ?use_batch_now,
833 ?batch_duration_ms,
834 ?min_time_to_complete,
835 "batch duration",
836 );
837
838 let mut responders = Vec::with_capacity(batch.len());
839 for (op, tx) in batch.drain(..) {
840 self.apply_write_op(op);
841 responders.push(tx);
842 }
843
844 consolidation::consolidate(&mut self.desired);
846 consolidation::consolidate(&mut self.to_write);
847
848 self.upper_tick_interval.reset();
859
860 self.write_to_persist(responders).await?;
861
862 tokio::time::sleep_until(min_time_to_complete).await;
867
868 ControlFlow::Continue(())
869 }
870
871 fn apply_write_op(&mut self, op: StorageWriteOp) {
873 match op {
874 StorageWriteOp::Append { updates } => {
875 self.desired.extend_from_slice(&updates);
876 self.to_write.extend(updates);
877 }
878 StorageWriteOp::Delete { filter } => {
879 let to_delete = self.desired.extract_if(.., |(row, _)| filter(row));
880 let retractions = to_delete.map(|(row, diff)| (row, -diff));
881 self.to_write.extend(retractions);
882 }
883 }
884 }
885
886 async fn write_to_persist(
890 &mut self,
891 responders: Vec<oneshot::Sender<Result<(), StorageError<T>>>>,
892 ) -> ControlFlow<String> {
893 if self.read_only {
894 tracing::debug!(%self.id, "not writing to differential collection: read-only");
895 return ControlFlow::Continue(());
897 }
898
899 let retries = Retry::default()
906 .initial_backoff(Duration::from_secs(1))
907 .clamp_backoff(Duration::from_secs(3))
908 .factor(1.25)
909 .max_tries(20)
910 .into_retry_stream();
911 let mut retries = Box::pin(retries);
912
913 loop {
914 let now = T::from((self.now)());
916 let new_upper = std::cmp::max(
917 now,
918 TimestampManipulation::step_forward(&self.current_upper),
919 );
920
921 let updates_to_write = self
922 .to_write
923 .iter()
924 .map(|(row, diff)| {
925 (
926 (SourceData(Ok(row.clone())), ()),
927 self.current_upper.clone(),
928 diff.into_inner(),
929 )
930 })
931 .collect::<Vec<_>>();
932
933 assert!(!self.read_only);
934 let res = self
935 .write_handle
936 .compare_and_append(
937 updates_to_write,
938 Antichain::from_elem(self.current_upper.clone()),
939 Antichain::from_elem(new_upper.clone()),
940 )
941 .await
942 .expect("valid usage");
943 match res {
944 Ok(()) => {
946 notify_listeners(responders, || Ok(()));
948
949 self.current_upper = new_upper;
950
951 self.to_write.clear();
954
955 tracing::debug!(%self.id, "appended to differential collection");
956
957 break;
959 }
960 Err(err) => {
962 let actual_upper = if let Some(ts) = err.current.as_option() {
966 ts.clone()
967 } else {
968 return ControlFlow::Break("upper is the empty antichain".to_string());
969 };
970
971 tracing::info!(%self.id, ?actual_upper, expected_upper = ?self.current_upper, "retrying append for differential collection");
972
973 if retries.next().await.is_none() {
976 let invalid_upper = InvalidUpper {
977 id: self.id,
978 current_upper: err.current,
979 };
980 notify_listeners(responders, || {
981 Err(StorageError::InvalidUppers(vec![invalid_upper.clone()]))
982 });
983 error!(
984 "exhausted retries when appending to managed collection {}",
985 self.id
986 );
987 break;
988 }
989
990 self.current_upper = actual_upper;
991
992 self.sync_to_persist().await;
993
994 debug!(
995 "Retrying invalid-uppers error while appending to differential collection {}",
996 self.id
997 );
998 }
999 }
1000 }
1001
1002 ControlFlow::Continue(())
1003 }
1004
1005 async fn sync_to_persist(&mut self) {
1013 let mut read_handle = (self.read_handle_fn)().await;
1014 let as_of = self
1015 .current_upper
1016 .step_back()
1017 .unwrap_or_else(|| T::minimum());
1018 let as_of = Antichain::from_elem(as_of);
1019 let snapshot = read_handle.snapshot_and_fetch(as_of).await;
1020
1021 let mut negated_oks = match snapshot {
1022 Ok(contents) => {
1023 let mut snapshot = Vec::with_capacity(contents.len());
1024 for ((data, _), _, diff) in contents {
1025 let row = data.0.unwrap();
1026 snapshot.push((row, -Diff::from(diff)));
1027 }
1028 snapshot
1029 }
1030 Err(_) => panic!("read before since"),
1031 };
1032
1033 self.to_write.clear();
1034 self.to_write.extend(self.desired.iter().cloned());
1035 self.to_write.append(&mut negated_oks);
1036 consolidation::consolidate(&mut self.to_write);
1037 }
1038}
1039
1040pub(crate) struct AppendOnlyIntrospectionConfig<T>
1041where
1042 T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
1043{
1044 pub(crate) introspection_type: IntrospectionType,
1045 pub(crate) config_set: Arc<ConfigSet>,
1046 pub(crate) parameters: StorageParameters,
1047 pub(crate) storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
1048}
1049
1050struct AppendOnlyWriteTask<T>
1055where
1056 T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
1057{
1058 id: GlobalId,
1060 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
1061 read_only: bool,
1062 now: NowFn,
1063 user_batch_duration_ms: Arc<AtomicU64>,
1064 rx: mpsc::UnboundedReceiver<(
1066 Vec<AppendOnlyUpdate>,
1067 oneshot::Sender<Result<(), StorageError<T>>>,
1068 )>,
1069
1070 shutdown_rx: oneshot::Receiver<()>,
1072 previous_statuses: Option<BTreeMap<(GlobalId, Option<ReplicaId>), Status>>,
1074}
1075
1076impl<T> AppendOnlyWriteTask<T>
1077where
1078 T: Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
1079{
1080 fn spawn(
1088 id: GlobalId,
1089 write_handle: WriteHandle<SourceData, (), T, StorageDiff>,
1090 read_only: bool,
1091 now: NowFn,
1092 user_batch_duration_ms: Arc<AtomicU64>,
1093 introspection_config: Option<AppendOnlyIntrospectionConfig<T>>,
1094 ) -> (AppendOnlyWriteChannel<T>, WriteTask, ShutdownSender) {
1095 let (tx, rx) = mpsc::unbounded_channel();
1096 let (shutdown_tx, shutdown_rx) = oneshot::channel();
1097
1098 let previous_statuses: Option<BTreeMap<(GlobalId, Option<ReplicaId>), Status>> =
1099 match introspection_config
1100 .as_ref()
1101 .map(|config| config.introspection_type)
1102 {
1103 Some(IntrospectionType::SourceStatusHistory)
1104 | Some(IntrospectionType::SinkStatusHistory) => Some(BTreeMap::new()),
1105
1106 Some(IntrospectionType::ReplicaMetricsHistory)
1107 | Some(IntrospectionType::WallclockLagHistory)
1108 | Some(IntrospectionType::WallclockLagHistogram)
1109 | Some(IntrospectionType::PrivatelinkConnectionStatusHistory)
1110 | Some(IntrospectionType::ReplicaStatusHistory)
1111 | Some(IntrospectionType::PreparedStatementHistory)
1112 | Some(IntrospectionType::StatementExecutionHistory)
1113 | Some(IntrospectionType::SessionHistory)
1114 | Some(IntrospectionType::StatementLifecycleHistory)
1115 | Some(IntrospectionType::SqlText)
1116 | None => None,
1117
1118 Some(introspection_type @ IntrospectionType::ShardMapping)
1119 | Some(introspection_type @ IntrospectionType::Frontiers)
1120 | Some(introspection_type @ IntrospectionType::ReplicaFrontiers)
1121 | Some(introspection_type @ IntrospectionType::StorageSourceStatistics)
1122 | Some(introspection_type @ IntrospectionType::StorageSinkStatistics)
1123 | Some(introspection_type @ IntrospectionType::ComputeDependencies)
1124 | Some(introspection_type @ IntrospectionType::ComputeOperatorHydrationStatus)
1125 | Some(introspection_type @ IntrospectionType::ComputeMaterializedViewRefreshes)
1126 | Some(introspection_type @ IntrospectionType::ComputeErrorCounts)
1127 | Some(introspection_type @ IntrospectionType::ComputeHydrationTimes) => {
1128 unreachable!("not append-only collection: {introspection_type:?}")
1129 }
1130 };
1131
1132 let mut task = Self {
1133 id,
1134 write_handle,
1135 rx,
1136 shutdown_rx,
1137 read_only,
1138 now,
1139 user_batch_duration_ms,
1140 previous_statuses,
1141 };
1142
1143 let handle = mz_ore::task::spawn(
1144 || format!("CollectionManager-append_only_write_task-{id}"),
1145 async move {
1146 if !task.read_only {
1147 task.prepare(introspection_config).await;
1148 }
1149 task.run().await;
1150 },
1151 );
1152
1153 (tx, handle.abort_on_drop(), shutdown_tx)
1154 }
1155
1156 async fn prepare(&mut self, introspection_config: Option<AppendOnlyIntrospectionConfig<T>>) {
1161 let Some(AppendOnlyIntrospectionConfig {
1162 introspection_type,
1163 config_set,
1164 parameters,
1165 storage_collections,
1166 }) = introspection_config
1167 else {
1168 return;
1169 };
1170 let initial_statuses = match introspection_type {
1171 IntrospectionType::ReplicaMetricsHistory
1172 | IntrospectionType::WallclockLagHistory
1173 | IntrospectionType::WallclockLagHistogram => {
1174 let result = partially_truncate_metrics_history(
1175 self.id,
1176 introspection_type,
1177 &mut self.write_handle,
1178 config_set,
1179 self.now.clone(),
1180 storage_collections,
1181 )
1182 .await;
1183 if let Err(error) = result {
1184 soft_panic_or_log!(
1185 "error truncating metrics history: {error} (type={introspection_type:?})"
1186 );
1187 }
1188 Vec::new()
1189 }
1190
1191 IntrospectionType::PrivatelinkConnectionStatusHistory => {
1192 partially_truncate_status_history(
1193 self.id,
1194 IntrospectionType::PrivatelinkConnectionStatusHistory,
1195 &mut self.write_handle,
1196 privatelink_status_history_desc(¶meters),
1197 self.now.clone(),
1198 &storage_collections,
1199 )
1200 .await;
1201 Vec::new()
1202 }
1203 IntrospectionType::ReplicaStatusHistory => {
1204 partially_truncate_status_history(
1205 self.id,
1206 IntrospectionType::ReplicaStatusHistory,
1207 &mut self.write_handle,
1208 replica_status_history_desc(¶meters),
1209 self.now.clone(),
1210 &storage_collections,
1211 )
1212 .await;
1213 Vec::new()
1214 }
1215
1216 IntrospectionType::PreparedStatementHistory
1219 | IntrospectionType::StatementExecutionHistory
1220 | IntrospectionType::SessionHistory
1221 | IntrospectionType::StatementLifecycleHistory
1222 | IntrospectionType::SqlText => {
1223 Vec::new()
1228 }
1229
1230 IntrospectionType::SourceStatusHistory => {
1231 let last_status_per_id = partially_truncate_status_history(
1232 self.id,
1233 IntrospectionType::SourceStatusHistory,
1234 &mut self.write_handle,
1235 source_status_history_desc(¶meters),
1236 self.now.clone(),
1237 &storage_collections,
1238 )
1239 .await;
1240
1241 let status_col = MZ_SOURCE_STATUS_HISTORY_DESC
1242 .get_by_name(&ColumnName::from("status"))
1243 .expect("schema has not changed")
1244 .0;
1245
1246 last_status_per_id
1247 .into_iter()
1248 .map(|(id, row)| {
1249 (
1250 id,
1251 Status::from_str(
1252 row.iter()
1253 .nth(status_col)
1254 .expect("schema has not changed")
1255 .unwrap_str(),
1256 )
1257 .expect("statuses must be uncorrupted"),
1258 )
1259 })
1260 .collect()
1261 }
1262 IntrospectionType::SinkStatusHistory => {
1263 let last_status_per_id = partially_truncate_status_history(
1264 self.id,
1265 IntrospectionType::SinkStatusHistory,
1266 &mut self.write_handle,
1267 sink_status_history_desc(¶meters),
1268 self.now.clone(),
1269 &storage_collections,
1270 )
1271 .await;
1272
1273 let status_col = MZ_SINK_STATUS_HISTORY_DESC
1274 .get_by_name(&ColumnName::from("status"))
1275 .expect("schema has not changed")
1276 .0;
1277
1278 last_status_per_id
1279 .into_iter()
1280 .map(|(id, row)| {
1281 (
1282 id,
1283 Status::from_str(
1284 row.iter()
1285 .nth(status_col)
1286 .expect("schema has not changed")
1287 .unwrap_str(),
1288 )
1289 .expect("statuses must be uncorrupted"),
1290 )
1291 })
1292 .collect()
1293 }
1294
1295 introspection_type @ IntrospectionType::ShardMapping
1296 | introspection_type @ IntrospectionType::Frontiers
1297 | introspection_type @ IntrospectionType::ReplicaFrontiers
1298 | introspection_type @ IntrospectionType::StorageSourceStatistics
1299 | introspection_type @ IntrospectionType::StorageSinkStatistics
1300 | introspection_type @ IntrospectionType::ComputeDependencies
1301 | introspection_type @ IntrospectionType::ComputeOperatorHydrationStatus
1302 | introspection_type @ IntrospectionType::ComputeMaterializedViewRefreshes
1303 | introspection_type @ IntrospectionType::ComputeErrorCounts
1304 | introspection_type @ IntrospectionType::ComputeHydrationTimes => {
1305 unreachable!("not append-only collection: {introspection_type:?}")
1306 }
1307 };
1308 if let Some(previous_statuses) = &mut self.previous_statuses {
1309 previous_statuses.extend(initial_statuses);
1310 }
1311 }
1312
1313 async fn run(mut self) {
1314 let mut interval = tokio::time::interval(Duration::from_millis(DEFAULT_TICK_MS));
1315
1316 let mut batch: Vec<(Vec<_>, _)> = Vec::new();
1317
1318 'run: loop {
1319 tokio::select! {
1320 biased;
1323
1324 _ = &mut self.shutdown_rx => {
1326 let mut senders = Vec::new();
1327
1328 self.rx.close();
1330
1331 while let Ok((_batch, sender)) = self.rx.try_recv() {
1333 senders.push(sender);
1334 }
1335
1336 notify_listeners(senders, || Err(StorageError::IdentifierInvalid(self.id)));
1342
1343 break 'run;
1344 }
1345
1346 () = recv_all_commands(&mut self.rx, &mut batch) => {
1348 if batch.is_empty() {
1349 break 'run;
1353 }
1354
1355 let batch_duration_ms = match self.id {
1358 GlobalId::User(_) => Duration::from_millis(
1359 self.user_batch_duration_ms.load(Ordering::Relaxed),
1360 ),
1361 _ => STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION_DEFAULT,
1363 };
1364 let use_batch_now = Instant::now();
1365 let min_time_to_complete = use_batch_now + batch_duration_ms;
1366
1367 tracing::debug!(
1368 ?use_batch_now,
1369 ?batch_duration_ms,
1370 ?min_time_to_complete,
1371 "batch duration",
1372 );
1373
1374 interval.reset();
1383
1384 let capacity: usize = batch
1385 .iter()
1386 .map(|(rows, _)| rows.len())
1387 .sum();
1388 let mut all_rows = Vec::with_capacity(capacity);
1389 let mut responders = Vec::with_capacity(batch.len());
1390
1391 for (updates, responder) in batch.drain(..) {
1392 let rows = self.process_updates(updates);
1393
1394 all_rows.extend(
1395 rows.map(|(row, diff)| TimestamplessUpdate { row, diff }),
1396 );
1397 responders.push(responder);
1398 }
1399
1400 if self.read_only {
1401 tracing::warn!(%self.id, ?all_rows, "append while in read-only mode");
1402 notify_listeners(responders, || Err(StorageError::ReadOnly));
1403 continue;
1404 }
1405
1406 let at_least = T::from((self.now)());
1408
1409 if !all_rows.is_empty() {
1410 monotonic_append(&mut self.write_handle, all_rows, at_least).await;
1411 }
1412 notify_listeners(responders, || Ok(()));
1414
1415 tokio::time::sleep_until(min_time_to_complete).await;
1420 }
1421
1422 _ = interval.tick() => {
1424 if self.read_only {
1425 continue;
1427 }
1428
1429 let now = T::from((self.now)());
1431 let updates = vec![];
1432 let at_least = now.clone();
1433
1434 monotonic_append(&mut self.write_handle, updates, at_least).await;
1439 },
1440 }
1441 }
1442
1443 info!("write_task-{} ending", self.id);
1444 }
1445
1446 fn process_updates(
1449 &mut self,
1450 updates: Vec<AppendOnlyUpdate>,
1451 ) -> impl Iterator<Item = (Row, Diff)> {
1452 let updates = if let Some(previous_statuses) = &mut self.previous_statuses {
1453 let new: Vec<_> = updates
1454 .into_iter()
1455 .filter(|r| match r {
1456 AppendOnlyUpdate::Row(_) => true,
1457 AppendOnlyUpdate::Status(update) => {
1458 match (
1459 previous_statuses
1460 .get(&(update.id, update.replica_id))
1461 .as_deref(),
1462 &update.status,
1463 ) {
1464 (None, _) => true,
1465 (Some(old), new) => old.superseded_by(*new),
1466 }
1467 }
1468 })
1469 .collect();
1470 previous_statuses.extend(new.iter().filter_map(|update| match update {
1471 AppendOnlyUpdate::Row(_) => None,
1472 AppendOnlyUpdate::Status(update) => {
1473 Some(((update.id, update.replica_id), update.status))
1474 }
1475 }));
1476 new
1477 } else {
1478 updates
1479 };
1480
1481 updates.into_iter().map(AppendOnlyUpdate::into_row)
1482 }
1483}
1484
1485async fn partially_truncate_metrics_history<T>(
1492 id: GlobalId,
1493 introspection_type: IntrospectionType,
1494 write_handle: &mut WriteHandle<SourceData, (), T, StorageDiff>,
1495 config_set: Arc<ConfigSet>,
1496 now: NowFn,
1497 storage_collections: Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
1498) -> Result<(), anyhow::Error>
1499where
1500 T: Codec64 + From<EpochMillis> + TimestampManipulation,
1501{
1502 let (keep_duration, occurred_at_col) = match introspection_type {
1503 IntrospectionType::ReplicaMetricsHistory => (
1504 REPLICA_METRICS_HISTORY_RETENTION_INTERVAL.get(&config_set),
1505 REPLICA_METRICS_HISTORY_DESC
1506 .get_by_name(&ColumnName::from("occurred_at"))
1507 .expect("schema has not changed")
1508 .0,
1509 ),
1510 IntrospectionType::WallclockLagHistory => (
1511 WALLCLOCK_LAG_HISTORY_RETENTION_INTERVAL.get(&config_set),
1512 WALLCLOCK_LAG_HISTORY_DESC
1513 .get_by_name(&ColumnName::from("occurred_at"))
1514 .expect("schema has not changed")
1515 .0,
1516 ),
1517 IntrospectionType::WallclockLagHistogram => (
1518 WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RETENTION_INTERVAL.get(&config_set),
1519 WALLCLOCK_GLOBAL_LAG_HISTOGRAM_RAW_DESC
1520 .get_by_name(&ColumnName::from("period_start"))
1521 .expect("schema has not changed")
1522 .0,
1523 ),
1524 _ => panic!("not a metrics history: {introspection_type:?}"),
1525 };
1526
1527 let upper = write_handle.fetch_recent_upper().await;
1528 let Some(upper_ts) = upper.as_option() else {
1529 bail!("collection is sealed");
1530 };
1531 let Some(as_of_ts) = upper_ts.step_back() else {
1532 return Ok(()); };
1534
1535 let mut rows = storage_collections
1536 .snapshot_cursor(id, as_of_ts)
1537 .await
1538 .map_err(|e| anyhow!("reading snapshot: {e:?}"))?;
1539
1540 let now = mz_ore::now::to_datetime(now());
1541 let keep_since = now - keep_duration;
1542
1543 let old_upper_ts = upper_ts.clone();
1548 let new_upper_ts = TimestampManipulation::step_forward(&old_upper_ts);
1549
1550 let mut builder = write_handle.builder(Antichain::from_elem(old_upper_ts.clone()));
1552 while let Some(chunk) = rows.next().await {
1553 for (data, _t, diff) in chunk {
1554 let Ok(row) = &data.0 else { continue };
1555 let datums = row.unpack();
1556 let occurred_at = datums[occurred_at_col].unwrap_timestamptz();
1557 if *occurred_at >= keep_since {
1558 continue;
1559 }
1560 let diff = -diff;
1561 match builder.add(&data, &(), &old_upper_ts, &diff).await? {
1562 Added::Record => {}
1563 Added::RecordAndParts => {
1564 debug!(?id, "added part to builder");
1565 }
1566 }
1567 }
1568 }
1569
1570 let mut updates = builder
1571 .finish(Antichain::from_elem(new_upper_ts.clone()))
1572 .await?;
1573 let mut batches = vec![&mut updates];
1574
1575 write_handle
1576 .compare_and_append_batch(
1577 batches.as_mut_slice(),
1578 Antichain::from_elem(old_upper_ts),
1579 Antichain::from_elem(new_upper_ts),
1580 true,
1581 )
1582 .await
1583 .expect("valid usage")
1584 .map_err(|e| anyhow!("appending retractions: {e:?}"))
1585}
1586
1587pub(crate) async fn partially_truncate_status_history<T, K>(
1597 id: GlobalId,
1598 introspection_type: IntrospectionType,
1599 write_handle: &mut WriteHandle<SourceData, (), T, StorageDiff>,
1600 status_history_desc: StatusHistoryDesc<K>,
1601 now: NowFn,
1602 storage_collections: &Arc<dyn StorageCollections<Timestamp = T> + Send + Sync>,
1603) -> BTreeMap<K, Row>
1604where
1605 T: Codec64 + From<EpochMillis> + TimestampManipulation,
1606 K: Clone + Debug + Ord + Send + Sync,
1607{
1608 let upper = write_handle.fetch_recent_upper().await.clone();
1609
1610 let mut rows = match upper.as_option() {
1611 Some(f) if f > &T::minimum() => {
1612 let as_of = f.step_back().unwrap();
1613
1614 storage_collections
1615 .snapshot_cursor(id, as_of)
1616 .await
1617 .expect("snapshot succeeds")
1618 }
1619 _ => return BTreeMap::new(),
1622 };
1623
1624 let mut latest_row_per_key: BTreeMap<K, (CheckedTimestamp<DateTime<Utc>>, Row)> =
1626 BTreeMap::new();
1627
1628 let expected_upper = upper.into_option().expect("checked above");
1633 let new_upper = TimestampManipulation::step_forward(&expected_upper);
1634
1635 let mut deletions = write_handle.builder(Antichain::from_elem(expected_upper.clone()));
1636
1637 let mut handle_row = {
1638 let latest_row_per_key = &mut latest_row_per_key;
1639 move |row: &Row, diff| {
1640 let datums = row.unpack();
1641 let key = (status_history_desc.extract_key)(&datums);
1642 let timestamp = (status_history_desc.extract_time)(&datums);
1643
1644 assert!(
1645 diff > 0,
1646 "only know how to operate over consolidated data with diffs > 0, \
1647 found diff {diff} for object {key:?} in {introspection_type:?}",
1648 );
1649
1650 match latest_row_per_key.get(&key) {
1652 Some(existing) if &existing.0 > ×tamp => {}
1653 _ => {
1654 latest_row_per_key.insert(key.clone(), (timestamp, row.clone()));
1655 }
1656 };
1657 (key, timestamp)
1658 }
1659 };
1660
1661 match status_history_desc.retention_policy {
1662 StatusHistoryRetentionPolicy::LastN(n) => {
1663 let mut last_n_entries_per_key: BTreeMap<
1665 K,
1666 BinaryHeap<Reverse<(CheckedTimestamp<DateTime<Utc>>, Row)>>,
1667 > = BTreeMap::new();
1668
1669 while let Some(chunk) = rows.next().await {
1670 for (data, _t, diff) in chunk {
1671 let Ok(row) = &data.0 else { continue };
1672 let (key, timestamp) = handle_row(row, diff);
1673
1674 let entries = last_n_entries_per_key.entry(key).or_default();
1677 for _ in 0..diff {
1678 entries.push(Reverse((timestamp, row.clone())));
1688
1689 while entries.len() > n {
1692 if let Some(Reverse((_, r))) = entries.pop() {
1693 deletions
1694 .add(&SourceData(Ok(r)), &(), &expected_upper, &-1)
1695 .await
1696 .expect("usage should be valid");
1697 }
1698 }
1699 }
1700 }
1701 }
1702 }
1703 StatusHistoryRetentionPolicy::TimeWindow(time_window) => {
1704 let now = mz_ore::now::to_datetime(now());
1706 let keep_since = now - time_window;
1707
1708 while let Some(chunk) = rows.next().await {
1710 for (data, _t, diff) in chunk {
1711 let Ok(row) = &data.0 else { continue };
1712 let (_, timestamp) = handle_row(row, diff);
1713
1714 if *timestamp < keep_since {
1715 deletions
1716 .add(&data, &(), &expected_upper, &-1)
1717 .await
1718 .expect("usage should be valid");
1719 }
1720 }
1721 }
1722 }
1723 }
1724
1725 let mut updates = deletions
1726 .finish(Antichain::from_elem(new_upper.clone()))
1727 .await
1728 .expect("expected valid usage");
1729 let mut batches = vec![&mut updates];
1730
1731 let res = write_handle
1733 .compare_and_append_batch(
1734 batches.as_mut_slice(),
1735 Antichain::from_elem(expected_upper.clone()),
1736 Antichain::from_elem(new_upper),
1737 true,
1738 )
1739 .await
1740 .expect("usage was valid");
1741
1742 match res {
1743 Ok(_) => {
1744 }
1746 Err(err) => {
1747 info!(
1755 %id, ?expected_upper, current_upper = ?err.current,
1756 "failed to append partial truncation",
1757 );
1758 }
1759 }
1760
1761 latest_row_per_key
1762 .into_iter()
1763 .map(|(key, (_, row))| (key, row))
1764 .collect()
1765}
1766
1767async fn monotonic_append<T: Timestamp + Lattice + Codec64 + TimestampManipulation>(
1768 write_handle: &mut WriteHandle<SourceData, (), T, StorageDiff>,
1769 updates: Vec<TimestamplessUpdate>,
1770 at_least: T,
1771) {
1772 let mut expected_upper = write_handle.shared_upper();
1773 loop {
1774 if updates.is_empty() && expected_upper.is_empty() {
1775 return;
1779 }
1780
1781 let upper = expected_upper
1782 .into_option()
1783 .expect("cannot append data to closed collection");
1784
1785 let lower = if upper.less_than(&at_least) {
1786 at_least.clone()
1787 } else {
1788 upper.clone()
1789 };
1790
1791 let new_upper = TimestampManipulation::step_forward(&lower);
1792 let updates = updates
1793 .iter()
1794 .map(|TimestamplessUpdate { row, diff }| {
1795 (
1796 (SourceData(Ok(row.clone())), ()),
1797 lower.clone(),
1798 diff.into_inner(),
1799 )
1800 })
1801 .collect::<Vec<_>>();
1802 let res = write_handle
1803 .compare_and_append(
1804 updates,
1805 Antichain::from_elem(upper),
1806 Antichain::from_elem(new_upper),
1807 )
1808 .await
1809 .expect("valid usage");
1810 match res {
1811 Ok(()) => return,
1812 Err(err) => {
1813 expected_upper = err.current;
1814 continue;
1815 }
1816 }
1817 }
1818}
1819
1820fn notify_listeners<T>(
1822 responders: impl IntoIterator<Item = oneshot::Sender<T>>,
1823 result: impl Fn() -> T,
1824) {
1825 for r in responders {
1826 let _ = r.send(result());
1828 }
1829}
1830
1831async fn recv_all_commands<T>(rx: &mut mpsc::UnboundedReceiver<T>, out: &mut Vec<T>) {
1844 if let Some(msg) = rx.recv().await {
1845 out.push(msg);
1846 } else {
1847 return; };
1849
1850 out.reserve(rx.len());
1851 while let Ok(msg) = rx.try_recv() {
1852 out.push(msg);
1853 }
1854
1855 if out.capacity() > out.len() * 4 {
1860 out.shrink_to_fit();
1861 }
1862}
1863
1864#[cfg(test)]
1865mod tests {
1866 use std::collections::BTreeSet;
1867
1868 use super::*;
1869 use itertools::Itertools;
1870 use mz_repr::{Datum, Row};
1871 use mz_storage_client::client::StatusUpdate;
1872 use mz_storage_client::healthcheck::{
1873 MZ_SINK_STATUS_HISTORY_DESC, MZ_SOURCE_STATUS_HISTORY_DESC,
1874 };
1875
1876 #[mz_ore::test]
1877 fn test_row() {
1878 let error_message = "error message";
1879 let hint = "hint message";
1880 let id = GlobalId::User(1);
1881 let status = Status::Dropped;
1882 let row = Row::from(StatusUpdate {
1883 id,
1884 timestamp: chrono::offset::Utc::now(),
1885 status,
1886 error: Some(error_message.to_string()),
1887 hints: BTreeSet::from([hint.to_string()]),
1888 namespaced_errors: Default::default(),
1889 replica_id: None,
1890 });
1891
1892 for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1893 assert!(datum.is_instance_of_sql(column_type));
1894 }
1895
1896 for (datum, column_type) in row
1897 .iter()
1898 .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
1899 {
1900 assert!(datum.is_instance_of_sql(column_type));
1901 }
1902
1903 assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1904 assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1905 assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
1906
1907 let details = row
1908 .iter()
1909 .nth(4)
1910 .unwrap()
1911 .unwrap_map()
1912 .iter()
1913 .collect::<Vec<_>>();
1914
1915 assert_eq!(details.len(), 1);
1916 let hint_datum = &details[0];
1917
1918 assert_eq!(hint_datum.0, "hints");
1919 assert_eq!(
1920 hint_datum.1.unwrap_list().iter().next().unwrap(),
1921 Datum::String(hint)
1922 );
1923 }
1924
1925 #[mz_ore::test]
1926 fn test_row_without_hint() {
1927 let error_message = "error message";
1928 let id = GlobalId::User(1);
1929 let status = Status::Dropped;
1930 let row = Row::from(StatusUpdate {
1931 id,
1932 timestamp: chrono::offset::Utc::now(),
1933 status,
1934 error: Some(error_message.to_string()),
1935 hints: Default::default(),
1936 namespaced_errors: Default::default(),
1937 replica_id: None,
1938 });
1939
1940 for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1941 assert!(datum.is_instance_of_sql(column_type));
1942 }
1943
1944 for (datum, column_type) in row
1945 .iter()
1946 .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
1947 {
1948 assert!(datum.is_instance_of_sql(column_type));
1949 }
1950
1951 assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1952 assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1953 assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
1954 assert_eq!(row.iter().nth(4).unwrap(), Datum::Null);
1955 }
1956
1957 #[mz_ore::test]
1958 fn test_row_without_error() {
1959 let id = GlobalId::User(1);
1960 let status = Status::Dropped;
1961 let hint = "hint message";
1962 let row = Row::from(StatusUpdate {
1963 id,
1964 timestamp: chrono::offset::Utc::now(),
1965 status,
1966 error: None,
1967 hints: BTreeSet::from([hint.to_string()]),
1968 namespaced_errors: Default::default(),
1969 replica_id: None,
1970 });
1971
1972 for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
1973 assert!(datum.is_instance_of_sql(column_type));
1974 }
1975
1976 for (datum, column_type) in row
1977 .iter()
1978 .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
1979 {
1980 assert!(datum.is_instance_of_sql(column_type));
1981 }
1982
1983 assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
1984 assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
1985 assert_eq!(row.iter().nth(3).unwrap(), Datum::Null);
1986
1987 let details = row
1988 .iter()
1989 .nth(4)
1990 .unwrap()
1991 .unwrap_map()
1992 .iter()
1993 .collect::<Vec<_>>();
1994
1995 assert_eq!(details.len(), 1);
1996 let hint_datum = &details[0];
1997
1998 assert_eq!(hint_datum.0, "hints");
1999 assert_eq!(
2000 hint_datum.1.unwrap_list().iter().next().unwrap(),
2001 Datum::String(hint)
2002 );
2003 }
2004
2005 #[mz_ore::test]
2006 fn test_row_with_namespaced() {
2007 let error_message = "error message";
2008 let id = GlobalId::User(1);
2009 let status = Status::Dropped;
2010 let row = Row::from(StatusUpdate {
2011 id,
2012 timestamp: chrono::offset::Utc::now(),
2013 status,
2014 error: Some(error_message.to_string()),
2015 hints: Default::default(),
2016 namespaced_errors: BTreeMap::from([("thing".to_string(), "error".to_string())]),
2017 replica_id: None,
2018 });
2019
2020 for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
2021 assert!(datum.is_instance_of_sql(column_type));
2022 }
2023
2024 for (datum, column_type) in row
2025 .iter()
2026 .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
2027 {
2028 assert!(datum.is_instance_of_sql(column_type));
2029 }
2030
2031 assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
2032 assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
2033 assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
2034
2035 let details = row
2036 .iter()
2037 .nth(4)
2038 .unwrap()
2039 .unwrap_map()
2040 .iter()
2041 .collect::<Vec<_>>();
2042
2043 assert_eq!(details.len(), 1);
2044 let ns_datum = &details[0];
2045
2046 assert_eq!(ns_datum.0, "namespaced");
2047 assert_eq!(
2048 ns_datum.1.unwrap_map().iter().next().unwrap(),
2049 ("thing", Datum::String("error"))
2050 );
2051 }
2052
2053 #[mz_ore::test]
2054 fn test_row_with_everything() {
2055 let error_message = "error message";
2056 let hint = "hint message";
2057 let id = GlobalId::User(1);
2058 let status = Status::Dropped;
2059 let row = Row::from(StatusUpdate {
2060 id,
2061 timestamp: chrono::offset::Utc::now(),
2062 status,
2063 error: Some(error_message.to_string()),
2064 hints: BTreeSet::from([hint.to_string()]),
2065 namespaced_errors: BTreeMap::from([("thing".to_string(), "error".to_string())]),
2066 replica_id: None,
2067 });
2068
2069 for (datum, column_type) in row.iter().zip_eq(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
2070 assert!(datum.is_instance_of_sql(column_type));
2071 }
2072
2073 for (datum, column_type) in row
2074 .iter()
2075 .zip_eq(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types())
2076 {
2077 assert!(datum.is_instance_of_sql(column_type));
2078 }
2079
2080 assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
2081 assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
2082 assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
2083
2084 let details = row
2085 .iter()
2086 .nth(4)
2087 .unwrap()
2088 .unwrap_map()
2089 .iter()
2090 .collect::<Vec<_>>();
2091
2092 assert_eq!(details.len(), 2);
2093 let hint_datum = &details[0];
2095 let ns_datum = &details[1];
2096
2097 assert_eq!(hint_datum.0, "hints");
2098 assert_eq!(
2099 hint_datum.1.unwrap_list().iter().next().unwrap(),
2100 Datum::String(hint)
2101 );
2102
2103 assert_eq!(ns_datum.0, "namespaced");
2104 assert_eq!(
2105 ns_datum.1.unwrap_map().iter().next().unwrap(),
2106 ("thing", Datum::String("error"))
2107 );
2108 }
2109}