1use std::cmp::Ordering;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::future::Future;
16use std::sync::Arc;
17
18use differential_dataflow::difference::Monoid;
19use differential_dataflow::lattice::Lattice;
20use futures::Stream;
21use mz_ore::instrument;
22use mz_ore::task::AbortOnDropHandle;
23use mz_persist_client::cfg::USE_CRITICAL_SINCE_TXN;
24use mz_persist_client::critical::SinceHandle;
25use mz_persist_client::read::{Cursor, LazyPartStats, ListenEvent, ReadHandle, Since, Subscribe};
26use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
27use mz_persist_client::write::WriteHandle;
28use mz_persist_client::{Diagnostics, PersistClient, ShardId};
29use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
30use mz_persist_types::{Codec, Codec64, Opaque, StepForward};
31use timely::order::TotalOrder;
32use timely::progress::{Antichain, Timestamp};
33use tokio::sync::{mpsc, oneshot};
34use tracing::{debug, warn};
35use uuid::Uuid;
36
37use crate::TxnsCodecDefault;
38use crate::txn_cache::{TxnsCache, TxnsCacheState};
39
40#[derive(Debug)]
47#[cfg_attr(test, derive(PartialEq))]
48pub struct DataSnapshot<T> {
49 pub(crate) data_id: ShardId,
51 pub(crate) latest_write: Option<T>,
54 pub(crate) as_of: T,
56 pub(crate) empty_to: T,
59}
60
61impl<T: Timestamp + Lattice + TotalOrder + Codec64 + Sync> DataSnapshot<T> {
62 #[instrument(level = "debug", fields(shard = %self.data_id, ts = ?self.as_of, empty_to = ?self.empty_to))]
65 pub(crate) async fn unblock_read<K, V, D>(&self, mut data_write: WriteHandle<K, V, T, D>)
66 where
67 K: Debug + Codec,
68 V: Debug + Codec,
69 D: Debug + Monoid + Ord + Codec64 + Send + Sync,
70 {
71 debug!(
72 "unblock_read latest_write={:?} as_of={:?} for {:.9}",
73 self.latest_write,
74 self.as_of,
75 self.data_id.to_string()
76 );
77 if let Some(latest_write) = self.latest_write.as_ref() {
79 let () = data_write
80 .wait_for_upper_past(&Antichain::from_elem(latest_write.clone()))
81 .await;
82 }
83
84 let Some(mut data_upper) = data_write.shared_upper().into_option() else {
96 debug!(
99 "CaA data snapshot {:.9} shard finalized",
100 self.data_id.to_string(),
101 );
102 return;
103 };
104
105 while data_upper < self.empty_to {
109 if let Some(latest_write) = self.latest_write.as_ref() {
113 assert!(latest_write < &data_upper);
114 }
115 assert!(self.as_of < self.empty_to);
116 let res = crate::small_caa(
117 || format!("data {:.9} unblock reads", self.data_id.to_string()),
118 &mut data_write,
119 &[],
120 data_upper.clone(),
121 self.empty_to.clone(),
122 )
123 .await;
124 match res {
125 Ok(()) => {
126 data_write.expire().await;
130 break;
131 }
132 Err(new_data_upper) => {
133 data_upper = new_data_upper;
134 continue;
135 }
136 }
137 }
138 }
139
140 pub async fn snapshot_and_fetch<K, V, D>(
142 &self,
143 data_read: &mut ReadHandle<K, V, T, D>,
144 ) -> Result<Vec<((K, V), T, D)>, Since<T>>
145 where
146 K: Debug + Codec + Ord,
147 V: Debug + Codec + Ord,
148 D: Debug + Monoid + Ord + Codec64 + Send + Sync,
149 {
150 let data_write = WriteHandle::from_read(data_read, "unblock_read");
151 self.unblock_read(data_write).await;
152 data_read
153 .snapshot_and_fetch(Antichain::from_elem(self.as_of.clone()))
154 .await
155 }
156
157 pub async fn snapshot_cursor<K, V, D>(
159 &self,
160 data_read: &mut ReadHandle<K, V, T, D>,
161 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
162 ) -> Result<Cursor<K, V, T, D>, Since<T>>
163 where
164 K: Debug + Codec + Ord,
165 V: Debug + Codec + Ord,
166 D: Debug + Monoid + Ord + Codec64 + Send + Sync,
167 {
168 let data_write = WriteHandle::from_read(data_read, "unblock_read");
169 self.unblock_read(data_write).await;
170 data_read
171 .snapshot_cursor(Antichain::from_elem(self.as_of.clone()), should_fetch_part)
172 .await
173 }
174
175 pub async fn snapshot_and_stream<K, V, D>(
177 &self,
178 data_read: &mut ReadHandle<K, V, T, D>,
179 ) -> Result<impl Stream<Item = ((K, V), T, D)> + use<K, V, T, D>, Since<T>>
180 where
181 K: Debug + Codec + Ord,
182 V: Debug + Codec + Ord,
183 D: Debug + Monoid + Ord + Codec64 + Send + Sync,
184 {
185 let data_write = WriteHandle::from_read(data_read, "unblock_read");
186 self.unblock_read(data_write).await;
187 data_read
188 .snapshot_and_stream(Antichain::from_elem(self.as_of.clone()))
189 .await
190 }
191
192 pub fn snapshot_stats_from_critical<K, V, D, O>(
194 &self,
195 data_since: &SinceHandle<K, V, T, D, O>,
196 ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static
197 where
198 K: Debug + Codec + Ord,
199 V: Debug + Codec + Ord,
200 D: Monoid + Codec64 + Send + Sync,
201 O: Opaque + Codec64,
202 {
203 let as_of = self.latest_write.clone().map(Antichain::from_elem);
220 data_since.snapshot_stats(as_of)
221 }
222
223 pub fn snapshot_stats_from_leased<K, V, D>(
225 &self,
226 data_since: &ReadHandle<K, V, T, D>,
227 ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static
228 where
229 K: Debug + Codec + Ord,
230 V: Debug + Codec + Ord,
231 D: Ord + Monoid + Codec64 + Send + Sync,
232 {
233 let as_of = self.latest_write.clone().map(Antichain::from_elem);
250 data_since.snapshot_stats(as_of)
251 }
252
253 pub async fn snapshot_parts_stats<K, V, D>(
255 &self,
256 data_read: &ReadHandle<K, V, T, D>,
257 ) -> Result<SnapshotPartsStats, Since<T>>
258 where
259 K: Debug + Codec + Ord,
260 V: Debug + Codec + Ord,
261 D: Debug + Monoid + Ord + Codec64 + Send + Sync,
262 {
263 let data_write = WriteHandle::from_read(data_read, "unblock_read");
264 self.unblock_read(data_write).await;
265 data_read
266 .snapshot_parts_stats(Antichain::from_elem(self.as_of.clone()))
267 .await
268 }
269
270 pub(crate) fn validate(&self) -> Result<(), String> {
271 if let Some(latest_write) = self.latest_write.as_ref() {
272 if !(latest_write <= &self.as_of) {
273 return Err(format!(
274 "latest_write {:?} not <= as_of {:?}",
275 self.latest_write, self.as_of
276 ));
277 }
278 }
279 if !(self.as_of < self.empty_to) {
280 return Err(format!(
281 "as_of {:?} not < empty_to {:?}",
282 self.as_of, self.empty_to
283 ));
284 }
285 Ok(())
286 }
287}
288
289#[derive(Debug)]
293#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
294pub enum DataListenNext<T> {
295 ReadDataTo(T),
298 EmitLogicalProgress(T),
302 WaitForTxnsProgress,
306}
307
308#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
316pub struct DataRemapEntry<T> {
317 pub physical_upper: T,
319 pub logical_upper: T,
322}
323
324#[derive(Debug)]
326pub(crate) struct DataSubscribe<T> {
327 pub(crate) data_id: ShardId,
329 pub(crate) snapshot: Option<DataSnapshot<T>>,
332 pub(crate) remap: DataRemapEntry<T>,
334}
335
336#[derive(Debug)]
338pub struct DataSubscription<T> {
339 subscribe: DataSubscribe<T>,
341 tx: mpsc::UnboundedSender<DataRemapEntry<T>>,
343}
344
345#[derive(Debug, Clone)]
347pub struct TxnsRead<T> {
348 txns_id: ShardId,
349 tx: mpsc::UnboundedSender<TxnsReadCmd<T>>,
350 _read_task: Arc<AbortOnDropHandle<()>>,
351 _subscribe_task: Arc<AbortOnDropHandle<()>>,
352}
353
354impl<T: Timestamp + Lattice + Codec64 + Sync> TxnsRead<T> {
355 pub async fn start<C>(client: PersistClient, txns_id: ShardId) -> Self
357 where
358 T: TotalOrder + StepForward,
359 C: TxnsCodec + 'static,
360 {
361 let (tx, rx) = mpsc::unbounded_channel();
362
363 let (mut subscribe_task, cache) =
364 TxnsSubscribeTask::<T, C>::open(&client, txns_id, None, tx.clone()).await;
365
366 let mut task = TxnsReadTask {
367 rx,
368 cache,
369 pending_waits_by_ts: BTreeSet::new(),
370 pending_waits_by_id: BTreeMap::new(),
371 data_subscriptions: Vec::new(),
372 };
373
374 let read_task =
375 mz_ore::task::spawn(|| "txn-wal::read_task", async move { task.run().await });
376
377 let subscribe_task = mz_ore::task::spawn(|| "txn-wal::subscribe_task", async move {
378 subscribe_task.run().await
379 });
380
381 TxnsRead {
382 txns_id,
383 tx,
384 _read_task: Arc::new(read_task.abort_on_drop()),
385 _subscribe_task: Arc::new(subscribe_task.abort_on_drop()),
386 }
387 }
388
389 pub fn txns_id(&self) -> &ShardId {
391 &self.txns_id
392 }
393
394 pub async fn data_snapshot(&self, data_id: ShardId, as_of: T) -> DataSnapshot<T> {
396 self.send(|tx| TxnsReadCmd::DataSnapshot { data_id, as_of, tx })
397 .await
398 }
399
400 pub(crate) async fn data_subscribe<K, V, D>(
404 &self,
405 data_id: ShardId,
406 as_of: T,
407 unblock: WriteHandle<K, V, T, D>,
408 ) -> mpsc::UnboundedReceiver<DataRemapEntry<T>>
409 where
410 K: Debug + Codec,
411 V: Debug + Codec,
412 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64,
413 D: Debug + Monoid + Ord + Codec64 + Send + Sync,
414 {
415 let (snapshot, rest) = self
416 .send(|tx| TxnsReadCmd::DataSubscribe { data_id, as_of, tx })
417 .await;
418 if let Some(snapshot) = snapshot {
419 snapshot.unblock_read(unblock).await;
420 }
421 rest
422 }
423
424 pub async fn update_ge(&self, ts: T) {
426 let wait = WaitTs::GreaterEqual(ts);
427 self.update(wait).await
428 }
429
430 pub async fn update_gt(&self, ts: T) {
432 let wait = WaitTs::GreaterThan(ts);
433 self.update(wait).await
434 }
435
436 async fn update(&self, wait: WaitTs<T>) {
437 let id = Uuid::new_v4();
438 let res = self.send(|tx| TxnsReadCmd::Wait {
439 id: id.clone(),
440 ts: wait,
441 tx,
442 });
443
444 let mut cancel_guard = CancelWaitOnDrop {
447 id,
448 tx: Some(self.tx.clone()),
449 };
450
451 let res = res.await;
452
453 cancel_guard.complete();
455
456 res
457 }
458
459 async fn send<R: std::fmt::Debug>(
460 &self,
461 cmd: impl FnOnce(oneshot::Sender<R>) -> TxnsReadCmd<T>,
462 ) -> R {
463 let (tx, rx) = oneshot::channel();
464 let req = cmd(tx);
465 let () = self.tx.send(req).expect("task unexpectedly shut down");
466 rx.await.expect("task unexpectedly shut down")
467 }
468}
469
470struct CancelWaitOnDrop<T> {
473 id: Uuid,
474 tx: Option<mpsc::UnboundedSender<TxnsReadCmd<T>>>,
475}
476
477impl<T> CancelWaitOnDrop<T> {
478 pub fn complete(&mut self) {
481 self.tx.take();
482 }
483}
484
485impl<T> Drop for CancelWaitOnDrop<T> {
486 fn drop(&mut self) {
487 let tx = match self.tx.take() {
488 Some(tx) => tx,
489 None => {
490 return;
492 }
493 };
494
495 let _ = tx.send(TxnsReadCmd::CancelWait {
496 id: self.id.clone(),
497 });
498 }
499}
500
501#[derive(Debug)]
502enum TxnsReadCmd<T> {
503 Updates {
504 entries: Vec<(TxnsEntry, T, i64)>,
505 frontier: T,
506 },
507 DataSnapshot {
508 data_id: ShardId,
509 as_of: T,
510 tx: oneshot::Sender<DataSnapshot<T>>,
511 },
512 DataSubscribe {
513 data_id: ShardId,
514 as_of: T,
515 tx: oneshot::Sender<(
516 Option<DataSnapshot<T>>,
517 mpsc::UnboundedReceiver<DataRemapEntry<T>>,
518 )>,
519 },
520 Wait {
521 id: Uuid,
522 ts: WaitTs<T>,
523 tx: oneshot::Sender<()>,
524 },
525 CancelWait {
526 id: Uuid,
527 },
528}
529
530#[derive(Debug, PartialEq, Eq, Clone)]
531enum WaitTs<T> {
532 GreaterEqual(T),
533 GreaterThan(T),
534}
535
536impl<T: Ord> Ord for WaitTs<T> {
544 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
545 let self_ts = match self {
546 WaitTs::GreaterEqual(ts) => ts,
547 WaitTs::GreaterThan(ts) => ts,
548 };
549 let other_ts = match other {
550 WaitTs::GreaterEqual(ts) => ts,
551 WaitTs::GreaterThan(ts) => ts,
552 };
553
554 if self_ts < other_ts {
555 Ordering::Less
556 } else if *self_ts > *other_ts {
557 Ordering::Greater
558 } else if matches!(self, WaitTs::GreaterEqual(_)) && matches!(other, WaitTs::GreaterThan(_))
559 {
560 Ordering::Less
561 } else if matches!(self, WaitTs::GreaterThan(_)) && matches!(other, WaitTs::GreaterEqual(_))
562 {
563 Ordering::Greater
564 } else {
565 Ordering::Equal
566 }
567 }
568}
569
570impl<T: Ord> PartialOrd for WaitTs<T> {
571 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
572 Some(self.cmp(other))
573 }
574}
575
576impl<T: Timestamp + Lattice> WaitTs<T> {
577 fn is_ready(&self, frontier: &T) -> bool {
579 match &self {
580 WaitTs::GreaterEqual(ts) => {
581 if frontier >= ts {
582 return true;
583 }
584 }
585 WaitTs::GreaterThan(ts) => {
586 if frontier > ts {
587 return true;
588 }
589 }
590 };
591
592 false
593 }
594}
595
596#[derive(Debug)]
597struct TxnsReadTask<T> {
598 rx: mpsc::UnboundedReceiver<TxnsReadCmd<T>>,
599 cache: TxnsCacheState<T>,
600 pending_waits_by_ts: BTreeSet<(WaitTs<T>, Uuid)>,
601 pending_waits_by_id: BTreeMap<Uuid, PendingWait<T>>,
602 data_subscriptions: Vec<DataSubscription<T>>,
603}
604
605#[derive(Debug)]
608struct PendingWait<T> {
609 ts: WaitTs<T>,
610 tx: Option<oneshot::Sender<()>>,
611}
612
613impl<T: Timestamp + Lattice + Codec64> PendingWait<T> {
614 fn maybe_complete(&mut self, frontier: &T) -> bool {
619 if self.tx.is_none() {
620 return true;
622 }
623
624 if self.ts.is_ready(frontier) {
625 let _ = self.tx.take().expect("known to exist").send(());
626 return true;
627 }
628
629 if let Some(tx) = self.tx.as_ref() {
630 if tx.is_closed() {
631 self.tx.take();
633 return true;
634 }
635 }
636
637 false
638 }
639}
640
641impl<T> TxnsReadTask<T>
642where
643 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
644{
645 async fn run(&mut self) {
646 while let Some(cmd) = self.rx.recv().await {
647 match cmd {
648 TxnsReadCmd::Updates { entries, frontier } => {
649 tracing::trace!(
650 "updates from subscribe task at ({:?}): {:?}",
651 frontier,
652 entries
653 );
654
655 self.cache.push_entries(entries.clone(), frontier.clone());
656
657 self.data_subscriptions
658 .retain(|subscription| !subscription.tx.is_closed());
659 for subscription in &mut self.data_subscriptions {
660 Self::update_subscription(subscription, &self.cache);
661 }
662
663 loop {
667 let first_wait = self.pending_waits_by_ts.first();
668
669 let (wait_ts, id) = match first_wait {
670 Some(wait) => wait,
671 None => break,
672 };
673
674 let completed = wait_ts.is_ready(&frontier);
675
676 if completed {
677 let mut wait = self
678 .pending_waits_by_id
679 .remove(id)
680 .expect("wait must be in map");
681
682 let really_completed = wait.maybe_complete(&frontier);
683 assert!(really_completed);
684
685 self.pending_waits_by_ts.pop_first();
686 } else {
687 break;
690 }
691 }
692 }
693 TxnsReadCmd::DataSnapshot { data_id, as_of, tx } => {
694 let res = self.cache.data_snapshot(data_id, as_of.clone());
695 let _ = tx.send(res);
696 }
697 TxnsReadCmd::DataSubscribe { data_id, as_of, tx } => {
698 let mut subscribe = self.cache.data_subscribe(data_id, as_of.clone());
699 let snapshot = subscribe.snapshot.take();
700 let (sub_tx, sub_rx) = mpsc::unbounded_channel();
701 sub_tx
703 .send(subscribe.remap.clone())
704 .expect("receiver still in scope");
705 let mut subscription = DataSubscription {
706 subscribe,
707 tx: sub_tx,
708 };
709 Self::update_subscription(&mut subscription, &self.cache);
711 self.data_subscriptions.push(subscription);
712 let _ = tx.send((snapshot, sub_rx));
713 }
714 TxnsReadCmd::Wait { id, ts, tx } => {
715 let mut pending_wait = PendingWait { ts, tx: Some(tx) };
716 let completed = pending_wait.maybe_complete(&self.cache.progress_exclusive);
717 if !completed {
718 let wait_ts = pending_wait.ts.clone();
719 self.pending_waits_by_ts.insert((wait_ts, id.clone()));
720 self.pending_waits_by_id.insert(id, pending_wait);
721 }
722 }
723 TxnsReadCmd::CancelWait { id } => {
724 if let Some(pending_wait) = self.pending_waits_by_id.remove(&id) {
729 self.pending_waits_by_ts.remove(&(pending_wait.ts, id));
730 }
731 }
732 }
733 }
734 warn!("TxnsReadTask shutting down");
735 }
736
737 fn update_subscription(subscription: &mut DataSubscription<T>, cache: &TxnsCacheState<T>) {
738 loop {
739 match cache.data_listen_next(
740 &subscription.subscribe.data_id,
741 &subscription.subscribe.remap.logical_upper,
742 ) {
743 DataListenNext::ReadDataTo(new_upper) => {
745 subscription.subscribe.remap.physical_upper = new_upper.clone();
747 subscription.subscribe.remap.logical_upper = new_upper.clone();
748 }
749 DataListenNext::EmitLogicalProgress(new_progress) => {
752 assert!(subscription.subscribe.remap.physical_upper < new_progress);
753 assert!(subscription.subscribe.remap.logical_upper < new_progress);
754
755 subscription.subscribe.remap.logical_upper = new_progress.clone();
756 }
757 DataListenNext::WaitForTxnsProgress => break,
760 };
761 let _ = subscription.tx.send(subscription.subscribe.remap.clone());
763 }
764 assert_eq!(
765 cache.progress_exclusive, subscription.subscribe.remap.logical_upper,
766 "we should update the subscription up to the current progress_exclusive"
767 );
768 }
769}
770
771#[derive(Debug)]
774struct TxnsSubscribeTask<T, C: TxnsCodec = TxnsCodecDefault> {
775 txns_subscribe: Subscribe<C::Key, C::Val, T, i64>,
776
777 buf: Vec<(TxnsEntry, T, i64)>,
780
781 tx: mpsc::UnboundedSender<TxnsReadCmd<T>>,
783
784 only_data_id: Option<ShardId>,
791}
792
793impl<T, C> TxnsSubscribeTask<T, C>
794where
795 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
796 C: TxnsCodec,
797{
798 pub async fn open(
810 client: &PersistClient,
811 txns_id: ShardId,
812 only_data_id: Option<ShardId>,
813 tx: mpsc::UnboundedSender<TxnsReadCmd<T>>,
814 ) -> (Self, TxnsCacheState<T>) {
815 let (txns_key_schema, txns_val_schema) = C::schemas();
816 let txns_read: ReadHandle<<C as TxnsCodec>::Key, <C as TxnsCodec>::Val, T, i64> = client
817 .open_leased_reader(
818 txns_id,
819 Arc::new(txns_key_schema),
820 Arc::new(txns_val_schema),
821 Diagnostics {
822 shard_name: "txns".to_owned(),
823 handle_purpose: "read txns".to_owned(),
824 },
825 USE_CRITICAL_SINCE_TXN.get(client.dyncfgs()),
826 )
827 .await
828 .expect("txns schema shouldn't change");
829 let (state, txns_subscribe) = TxnsCacheState::init::<C>(only_data_id, txns_read).await;
830 let subscribe_task = TxnsSubscribeTask {
831 txns_subscribe,
832 buf: Vec::new(),
833 tx,
834 only_data_id,
835 };
836
837 (subscribe_task, state)
838 }
839
840 async fn run(&mut self) {
841 loop {
842 let events = self.txns_subscribe.next(None).await;
843 for event in events {
844 match event {
845 ListenEvent::Progress(frontier) => {
846 let frontier_ts = frontier
847 .into_option()
848 .expect("nothing should close the txns shard");
849 let entries = std::mem::take(&mut self.buf);
850 let res = self.tx.send(TxnsReadCmd::Updates {
851 entries,
852 frontier: frontier_ts,
853 });
854 if let Err(e) = res {
855 warn!("TxnsSubscribeTask shutting down: {}", e);
856 return;
857 }
858 }
859 ListenEvent::Updates(parts) => {
860 TxnsCache::<T, C>::fetch_parts(
861 self.only_data_id.clone(),
862 &mut self.txns_subscribe,
863 parts,
864 &mut self.buf,
865 )
866 .await;
867 }
868 };
869 }
870 }
871 }
872}
873
874#[cfg(test)]
875mod tests {
876 use super::WaitTs;
877
878 #[mz_ore::test]
879 fn wait_ts_ord() {
880 let mut waits = vec![
881 WaitTs::GreaterThan(3),
882 WaitTs::GreaterThan(2),
883 WaitTs::GreaterEqual(2),
884 WaitTs::GreaterThan(1),
885 ];
886
887 waits.sort();
888
889 let expected = vec![
890 WaitTs::GreaterThan(1),
891 WaitTs::GreaterEqual(2),
892 WaitTs::GreaterThan(2),
893 WaitTs::GreaterThan(3),
894 ];
895
896 assert_eq!(waits, expected);
897 }
898}