1use std::cmp::Ordering;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::future::Future;
16use std::sync::Arc;
17
18use differential_dataflow::difference::Monoid;
19use differential_dataflow::lattice::Lattice;
20use futures::Stream;
21use mz_ore::instrument;
22use mz_ore::task::AbortOnDropHandle;
23use mz_persist_client::cfg::USE_CRITICAL_SINCE_TXN;
24use mz_persist_client::critical::SinceHandle;
25use mz_persist_client::read::{Cursor, LazyPartStats, ListenEvent, ReadHandle, Since, Subscribe};
26use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
27use mz_persist_client::write::WriteHandle;
28use mz_persist_client::{Diagnostics, PersistClient, ShardId};
29use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
30use mz_persist_types::{Codec, Codec64, Opaque, StepForward};
31use timely::order::TotalOrder;
32use timely::progress::{Antichain, Timestamp};
33use tokio::sync::{mpsc, oneshot};
34use tracing::{debug, warn};
35use uuid::Uuid;
36
37use crate::TxnsCodecDefault;
38use crate::txn_cache::{TxnsCache, TxnsCacheState};
39
40#[derive(Debug)]
47#[cfg_attr(test, derive(PartialEq))]
48pub struct DataSnapshot<T> {
49 pub(crate) data_id: ShardId,
51 pub(crate) latest_write: Option<T>,
54 pub(crate) as_of: T,
56 pub(crate) empty_to: T,
59}
60
61impl<T: Timestamp + Lattice + TotalOrder + Codec64 + Sync> DataSnapshot<T> {
62 #[instrument(level = "debug", fields(shard = %self.data_id, ts = ?self.as_of, empty_to = ?self.empty_to))]
65 pub(crate) async fn unblock_read<K, V, D>(&self, mut data_write: WriteHandle<K, V, T, D>)
66 where
67 K: Debug + Codec,
68 V: Debug + Codec,
69 D: Debug + Monoid + Ord + Codec64 + Send + Sync,
70 {
71 debug!(
72 "unblock_read latest_write={:?} as_of={:?} for {:.9}",
73 self.latest_write,
74 self.as_of,
75 self.data_id.to_string()
76 );
77 if let Some(latest_write) = self.latest_write.as_ref() {
79 let () = data_write
80 .wait_for_upper_past(&Antichain::from_elem(latest_write.clone()))
81 .await;
82 }
83
84 let Some(mut data_upper) = data_write.shared_upper().into_option() else {
96 debug!(
99 "CaA data snapshot {:.9} shard finalized",
100 self.data_id.to_string(),
101 );
102 return;
103 };
104
105 while data_upper < self.empty_to {
109 if let Some(latest_write) = self.latest_write.as_ref() {
113 assert!(latest_write < &data_upper);
114 }
115 assert!(self.as_of < self.empty_to);
116 let res = crate::small_caa(
117 || format!("data {:.9} unblock reads", self.data_id.to_string()),
118 &mut data_write,
119 &[],
120 data_upper.clone(),
121 self.empty_to.clone(),
122 )
123 .await;
124 match res {
125 Ok(()) => {
126 data_write.expire().await;
130 break;
131 }
132 Err(new_data_upper) => {
133 data_upper = new_data_upper;
134 continue;
135 }
136 }
137 }
138 }
139
140 pub async fn snapshot_and_fetch<K, V, D>(
142 &self,
143 data_read: &mut ReadHandle<K, V, T, D>,
144 ) -> Result<Vec<((Result<K, String>, Result<V, String>), T, D)>, Since<T>>
145 where
146 K: Debug + Codec + Ord,
147 V: Debug + Codec + Ord,
148 D: Debug + Monoid + Ord + Codec64 + Send + Sync,
149 {
150 let data_write = WriteHandle::from_read(data_read, "unblock_read");
151 self.unblock_read(data_write).await;
152 data_read
153 .snapshot_and_fetch(Antichain::from_elem(self.as_of.clone()))
154 .await
155 }
156
157 pub async fn snapshot_cursor<K, V, D>(
159 &self,
160 data_read: &mut ReadHandle<K, V, T, D>,
161 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
162 ) -> Result<Cursor<K, V, T, D>, Since<T>>
163 where
164 K: Debug + Codec + Ord,
165 V: Debug + Codec + Ord,
166 D: Debug + Monoid + Ord + Codec64 + Send + Sync,
167 {
168 let data_write = WriteHandle::from_read(data_read, "unblock_read");
169 self.unblock_read(data_write).await;
170 data_read
171 .snapshot_cursor(Antichain::from_elem(self.as_of.clone()), should_fetch_part)
172 .await
173 }
174
175 pub async fn snapshot_and_stream<K, V, D>(
177 &self,
178 data_read: &mut ReadHandle<K, V, T, D>,
179 ) -> Result<
180 impl Stream<Item = ((Result<K, String>, Result<V, String>), T, D)> + use<K, V, T, D>,
181 Since<T>,
182 >
183 where
184 K: Debug + Codec + Ord,
185 V: Debug + Codec + Ord,
186 D: Debug + Monoid + Ord + Codec64 + Send + Sync,
187 {
188 let data_write = WriteHandle::from_read(data_read, "unblock_read");
189 self.unblock_read(data_write).await;
190 data_read
191 .snapshot_and_stream(Antichain::from_elem(self.as_of.clone()))
192 .await
193 }
194
195 pub fn snapshot_stats_from_critical<K, V, D, O>(
197 &self,
198 data_since: &SinceHandle<K, V, T, D, O>,
199 ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static
200 where
201 K: Debug + Codec + Ord,
202 V: Debug + Codec + Ord,
203 D: Monoid + Codec64 + Send + Sync,
204 O: Opaque + Codec64,
205 {
206 let as_of = self.latest_write.clone().map(Antichain::from_elem);
223 data_since.snapshot_stats(as_of)
224 }
225
226 pub fn snapshot_stats_from_leased<K, V, D>(
228 &self,
229 data_since: &ReadHandle<K, V, T, D>,
230 ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static
231 where
232 K: Debug + Codec + Ord,
233 V: Debug + Codec + Ord,
234 D: Ord + Monoid + Codec64 + Send + Sync,
235 {
236 let as_of = self.latest_write.clone().map(Antichain::from_elem);
253 data_since.snapshot_stats(as_of)
254 }
255
256 pub async fn snapshot_parts_stats<K, V, D>(
258 &self,
259 data_read: &ReadHandle<K, V, T, D>,
260 ) -> Result<SnapshotPartsStats, Since<T>>
261 where
262 K: Debug + Codec + Ord,
263 V: Debug + Codec + Ord,
264 D: Debug + Monoid + Ord + Codec64 + Send + Sync,
265 {
266 let data_write = WriteHandle::from_read(data_read, "unblock_read");
267 self.unblock_read(data_write).await;
268 data_read
269 .snapshot_parts_stats(Antichain::from_elem(self.as_of.clone()))
270 .await
271 }
272
273 pub(crate) fn validate(&self) -> Result<(), String> {
274 if let Some(latest_write) = self.latest_write.as_ref() {
275 if !(latest_write <= &self.as_of) {
276 return Err(format!(
277 "latest_write {:?} not <= as_of {:?}",
278 self.latest_write, self.as_of
279 ));
280 }
281 }
282 if !(self.as_of < self.empty_to) {
283 return Err(format!(
284 "as_of {:?} not < empty_to {:?}",
285 self.as_of, self.empty_to
286 ));
287 }
288 Ok(())
289 }
290}
291
292#[derive(Debug)]
296#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
297pub enum DataListenNext<T> {
298 ReadDataTo(T),
301 EmitLogicalProgress(T),
305 WaitForTxnsProgress,
309}
310
311#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
319pub struct DataRemapEntry<T> {
320 pub physical_upper: T,
322 pub logical_upper: T,
325}
326
327#[derive(Debug)]
329pub(crate) struct DataSubscribe<T> {
330 pub(crate) data_id: ShardId,
332 pub(crate) snapshot: Option<DataSnapshot<T>>,
335 pub(crate) remap: DataRemapEntry<T>,
337}
338
339#[derive(Debug)]
341pub struct DataSubscription<T> {
342 subscribe: DataSubscribe<T>,
344 tx: mpsc::UnboundedSender<DataRemapEntry<T>>,
346}
347
348#[derive(Debug, Clone)]
350pub struct TxnsRead<T> {
351 txns_id: ShardId,
352 tx: mpsc::UnboundedSender<TxnsReadCmd<T>>,
353 _read_task: Arc<AbortOnDropHandle<()>>,
354 _subscribe_task: Arc<AbortOnDropHandle<()>>,
355}
356
357impl<T: Timestamp + Lattice + Codec64 + Sync> TxnsRead<T> {
358 pub async fn start<C>(client: PersistClient, txns_id: ShardId) -> Self
360 where
361 T: TotalOrder + StepForward,
362 C: TxnsCodec + 'static,
363 {
364 let (tx, rx) = mpsc::unbounded_channel();
365
366 let (mut subscribe_task, cache) =
367 TxnsSubscribeTask::<T, C>::open(&client, txns_id, None, tx.clone()).await;
368
369 let mut task = TxnsReadTask {
370 rx,
371 cache,
372 pending_waits_by_ts: BTreeSet::new(),
373 pending_waits_by_id: BTreeMap::new(),
374 data_subscriptions: Vec::new(),
375 };
376
377 let read_task =
378 mz_ore::task::spawn(|| "txn-wal::read_task", async move { task.run().await });
379
380 let subscribe_task = mz_ore::task::spawn(|| "txn-wal::subscribe_task", async move {
381 subscribe_task.run().await
382 });
383
384 TxnsRead {
385 txns_id,
386 tx,
387 _read_task: Arc::new(read_task.abort_on_drop()),
388 _subscribe_task: Arc::new(subscribe_task.abort_on_drop()),
389 }
390 }
391
392 pub fn txns_id(&self) -> &ShardId {
394 &self.txns_id
395 }
396
397 pub async fn data_snapshot(&self, data_id: ShardId, as_of: T) -> DataSnapshot<T> {
399 self.send(|tx| TxnsReadCmd::DataSnapshot { data_id, as_of, tx })
400 .await
401 }
402
403 pub(crate) async fn data_subscribe<K, V, D>(
407 &self,
408 data_id: ShardId,
409 as_of: T,
410 unblock: WriteHandle<K, V, T, D>,
411 ) -> mpsc::UnboundedReceiver<DataRemapEntry<T>>
412 where
413 K: Debug + Codec,
414 V: Debug + Codec,
415 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64,
416 D: Debug + Monoid + Ord + Codec64 + Send + Sync,
417 {
418 let (snapshot, rest) = self
419 .send(|tx| TxnsReadCmd::DataSubscribe { data_id, as_of, tx })
420 .await;
421 if let Some(snapshot) = snapshot {
422 snapshot.unblock_read(unblock).await;
423 }
424 rest
425 }
426
427 pub async fn update_ge(&self, ts: T) {
429 let wait = WaitTs::GreaterEqual(ts);
430 self.update(wait).await
431 }
432
433 pub async fn update_gt(&self, ts: T) {
435 let wait = WaitTs::GreaterThan(ts);
436 self.update(wait).await
437 }
438
439 async fn update(&self, wait: WaitTs<T>) {
440 let id = Uuid::new_v4();
441 let res = self.send(|tx| TxnsReadCmd::Wait {
442 id: id.clone(),
443 ts: wait,
444 tx,
445 });
446
447 let mut cancel_guard = CancelWaitOnDrop {
450 id,
451 tx: Some(self.tx.clone()),
452 };
453
454 let res = res.await;
455
456 cancel_guard.complete();
458
459 res
460 }
461
462 async fn send<R: std::fmt::Debug>(
463 &self,
464 cmd: impl FnOnce(oneshot::Sender<R>) -> TxnsReadCmd<T>,
465 ) -> R {
466 let (tx, rx) = oneshot::channel();
467 let req = cmd(tx);
468 let () = self.tx.send(req).expect("task unexpectedly shut down");
469 rx.await.expect("task unexpectedly shut down")
470 }
471}
472
473struct CancelWaitOnDrop<T> {
476 id: Uuid,
477 tx: Option<mpsc::UnboundedSender<TxnsReadCmd<T>>>,
478}
479
480impl<T> CancelWaitOnDrop<T> {
481 pub fn complete(&mut self) {
484 self.tx.take();
485 }
486}
487
488impl<T> Drop for CancelWaitOnDrop<T> {
489 fn drop(&mut self) {
490 let tx = match self.tx.take() {
491 Some(tx) => tx,
492 None => {
493 return;
495 }
496 };
497
498 let _ = tx.send(TxnsReadCmd::CancelWait {
499 id: self.id.clone(),
500 });
501 }
502}
503
504#[derive(Debug)]
505enum TxnsReadCmd<T> {
506 Updates {
507 entries: Vec<(TxnsEntry, T, i64)>,
508 frontier: T,
509 },
510 DataSnapshot {
511 data_id: ShardId,
512 as_of: T,
513 tx: oneshot::Sender<DataSnapshot<T>>,
514 },
515 DataSubscribe {
516 data_id: ShardId,
517 as_of: T,
518 tx: oneshot::Sender<(
519 Option<DataSnapshot<T>>,
520 mpsc::UnboundedReceiver<DataRemapEntry<T>>,
521 )>,
522 },
523 Wait {
524 id: Uuid,
525 ts: WaitTs<T>,
526 tx: oneshot::Sender<()>,
527 },
528 CancelWait {
529 id: Uuid,
530 },
531}
532
533#[derive(Debug, PartialEq, Eq, Clone)]
534enum WaitTs<T> {
535 GreaterEqual(T),
536 GreaterThan(T),
537}
538
539impl<T: Ord> Ord for WaitTs<T> {
547 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
548 let self_ts = match self {
549 WaitTs::GreaterEqual(ts) => ts,
550 WaitTs::GreaterThan(ts) => ts,
551 };
552 let other_ts = match other {
553 WaitTs::GreaterEqual(ts) => ts,
554 WaitTs::GreaterThan(ts) => ts,
555 };
556
557 if self_ts < other_ts {
558 Ordering::Less
559 } else if *self_ts > *other_ts {
560 Ordering::Greater
561 } else if matches!(self, WaitTs::GreaterEqual(_)) && matches!(other, WaitTs::GreaterThan(_))
562 {
563 Ordering::Less
564 } else if matches!(self, WaitTs::GreaterThan(_)) && matches!(other, WaitTs::GreaterEqual(_))
565 {
566 Ordering::Greater
567 } else {
568 Ordering::Equal
569 }
570 }
571}
572
573impl<T: Ord> PartialOrd for WaitTs<T> {
574 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
575 Some(self.cmp(other))
576 }
577}
578
579impl<T: Timestamp + Lattice> WaitTs<T> {
580 fn is_ready(&self, frontier: &T) -> bool {
582 match &self {
583 WaitTs::GreaterEqual(ts) => {
584 if frontier >= ts {
585 return true;
586 }
587 }
588 WaitTs::GreaterThan(ts) => {
589 if frontier > ts {
590 return true;
591 }
592 }
593 };
594
595 false
596 }
597}
598
599#[derive(Debug)]
600struct TxnsReadTask<T> {
601 rx: mpsc::UnboundedReceiver<TxnsReadCmd<T>>,
602 cache: TxnsCacheState<T>,
603 pending_waits_by_ts: BTreeSet<(WaitTs<T>, Uuid)>,
604 pending_waits_by_id: BTreeMap<Uuid, PendingWait<T>>,
605 data_subscriptions: Vec<DataSubscription<T>>,
606}
607
608#[derive(Debug)]
611struct PendingWait<T> {
612 ts: WaitTs<T>,
613 tx: Option<oneshot::Sender<()>>,
614}
615
616impl<T: Timestamp + Lattice + Codec64> PendingWait<T> {
617 fn maybe_complete(&mut self, frontier: &T) -> bool {
622 if self.tx.is_none() {
623 return true;
625 }
626
627 if self.ts.is_ready(frontier) {
628 let _ = self.tx.take().expect("known to exist").send(());
629 return true;
630 }
631
632 if let Some(tx) = self.tx.as_ref() {
633 if tx.is_closed() {
634 self.tx.take();
636 return true;
637 }
638 }
639
640 false
641 }
642}
643
644impl<T> TxnsReadTask<T>
645where
646 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
647{
648 async fn run(&mut self) {
649 while let Some(cmd) = self.rx.recv().await {
650 match cmd {
651 TxnsReadCmd::Updates { entries, frontier } => {
652 tracing::trace!(
653 "updates from subscribe task at ({:?}): {:?}",
654 frontier,
655 entries
656 );
657
658 self.cache.push_entries(entries.clone(), frontier.clone());
659
660 self.data_subscriptions
661 .retain(|subscription| !subscription.tx.is_closed());
662 for subscription in &mut self.data_subscriptions {
663 Self::update_subscription(subscription, &self.cache);
664 }
665
666 loop {
670 let first_wait = self.pending_waits_by_ts.first();
671
672 let (wait_ts, id) = match first_wait {
673 Some(wait) => wait,
674 None => break,
675 };
676
677 let completed = wait_ts.is_ready(&frontier);
678
679 if completed {
680 let mut wait = self
681 .pending_waits_by_id
682 .remove(id)
683 .expect("wait must be in map");
684
685 let really_completed = wait.maybe_complete(&frontier);
686 assert!(really_completed);
687
688 self.pending_waits_by_ts.pop_first();
689 } else {
690 break;
693 }
694 }
695 }
696 TxnsReadCmd::DataSnapshot { data_id, as_of, tx } => {
697 let res = self.cache.data_snapshot(data_id, as_of.clone());
698 let _ = tx.send(res);
699 }
700 TxnsReadCmd::DataSubscribe { data_id, as_of, tx } => {
701 let mut subscribe = self.cache.data_subscribe(data_id, as_of.clone());
702 let snapshot = subscribe.snapshot.take();
703 let (sub_tx, sub_rx) = mpsc::unbounded_channel();
704 sub_tx
706 .send(subscribe.remap.clone())
707 .expect("receiver still in scope");
708 let mut subscription = DataSubscription {
709 subscribe,
710 tx: sub_tx,
711 };
712 Self::update_subscription(&mut subscription, &self.cache);
714 self.data_subscriptions.push(subscription);
715 let _ = tx.send((snapshot, sub_rx));
716 }
717 TxnsReadCmd::Wait { id, ts, tx } => {
718 let mut pending_wait = PendingWait { ts, tx: Some(tx) };
719 let completed = pending_wait.maybe_complete(&self.cache.progress_exclusive);
720 if !completed {
721 let wait_ts = pending_wait.ts.clone();
722 self.pending_waits_by_ts.insert((wait_ts, id.clone()));
723 self.pending_waits_by_id.insert(id, pending_wait);
724 }
725 }
726 TxnsReadCmd::CancelWait { id } => {
727 if let Some(pending_wait) = self.pending_waits_by_id.remove(&id) {
732 self.pending_waits_by_ts.remove(&(pending_wait.ts, id));
733 }
734 }
735 }
736 }
737 warn!("TxnsReadTask shutting down");
738 }
739
740 fn update_subscription(subscription: &mut DataSubscription<T>, cache: &TxnsCacheState<T>) {
741 loop {
742 match cache.data_listen_next(
743 &subscription.subscribe.data_id,
744 &subscription.subscribe.remap.logical_upper,
745 ) {
746 DataListenNext::ReadDataTo(new_upper) => {
748 subscription.subscribe.remap.physical_upper = new_upper.clone();
750 subscription.subscribe.remap.logical_upper = new_upper.clone();
751 }
752 DataListenNext::EmitLogicalProgress(new_progress) => {
755 assert!(subscription.subscribe.remap.physical_upper < new_progress);
756 assert!(subscription.subscribe.remap.logical_upper < new_progress);
757
758 subscription.subscribe.remap.logical_upper = new_progress.clone();
759 }
760 DataListenNext::WaitForTxnsProgress => break,
763 };
764 let _ = subscription.tx.send(subscription.subscribe.remap.clone());
766 }
767 assert_eq!(
768 cache.progress_exclusive, subscription.subscribe.remap.logical_upper,
769 "we should update the subscription up to the current progress_exclusive"
770 );
771 }
772}
773
774#[derive(Debug)]
777struct TxnsSubscribeTask<T, C: TxnsCodec = TxnsCodecDefault> {
778 txns_subscribe: Subscribe<C::Key, C::Val, T, i64>,
779
780 buf: Vec<(TxnsEntry, T, i64)>,
783
784 tx: mpsc::UnboundedSender<TxnsReadCmd<T>>,
786
787 only_data_id: Option<ShardId>,
794}
795
796impl<T, C> TxnsSubscribeTask<T, C>
797where
798 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
799 C: TxnsCodec,
800{
801 pub async fn open(
813 client: &PersistClient,
814 txns_id: ShardId,
815 only_data_id: Option<ShardId>,
816 tx: mpsc::UnboundedSender<TxnsReadCmd<T>>,
817 ) -> (Self, TxnsCacheState<T>) {
818 let (txns_key_schema, txns_val_schema) = C::schemas();
819 let txns_read: ReadHandle<<C as TxnsCodec>::Key, <C as TxnsCodec>::Val, T, i64> = client
820 .open_leased_reader(
821 txns_id,
822 Arc::new(txns_key_schema),
823 Arc::new(txns_val_schema),
824 Diagnostics {
825 shard_name: "txns".to_owned(),
826 handle_purpose: "read txns".to_owned(),
827 },
828 USE_CRITICAL_SINCE_TXN.get(client.dyncfgs()),
829 )
830 .await
831 .expect("txns schema shouldn't change");
832 let (state, txns_subscribe) = TxnsCacheState::init::<C>(only_data_id, txns_read).await;
833 let subscribe_task = TxnsSubscribeTask {
834 txns_subscribe,
835 buf: Vec::new(),
836 tx,
837 only_data_id,
838 };
839
840 (subscribe_task, state)
841 }
842
843 async fn run(&mut self) {
844 loop {
845 let events = self.txns_subscribe.next(None).await;
846 for event in events {
847 match event {
848 ListenEvent::Progress(frontier) => {
849 let frontier_ts = frontier
850 .into_option()
851 .expect("nothing should close the txns shard");
852 let entries = std::mem::take(&mut self.buf);
853 let res = self.tx.send(TxnsReadCmd::Updates {
854 entries,
855 frontier: frontier_ts,
856 });
857 if let Err(e) = res {
858 warn!("TxnsSubscribeTask shutting down: {}", e);
859 return;
860 }
861 }
862 ListenEvent::Updates(parts) => {
863 TxnsCache::<T, C>::fetch_parts(
864 self.only_data_id.clone(),
865 &mut self.txns_subscribe,
866 parts,
867 &mut self.buf,
868 )
869 .await;
870 }
871 };
872 }
873 }
874 }
875}
876
877#[cfg(test)]
878mod tests {
879 use super::WaitTs;
880
881 #[mz_ore::test]
882 fn wait_ts_ord() {
883 let mut waits = vec![
884 WaitTs::GreaterThan(3),
885 WaitTs::GreaterThan(2),
886 WaitTs::GreaterEqual(2),
887 WaitTs::GreaterThan(1),
888 ];
889
890 waits.sort();
891
892 let expected = vec![
893 WaitTs::GreaterThan(1),
894 WaitTs::GreaterEqual(2),
895 WaitTs::GreaterThan(2),
896 WaitTs::GreaterThan(3),
897 ];
898
899 assert_eq!(waits, expected);
900 }
901}