mz_storage_types/
read_holds.rs1use std::fmt::Debug;
11use std::sync::Arc;
12
13use mz_repr::GlobalId;
14use thiserror::Error;
15use timely::PartialOrder;
16use timely::progress::{Antichain, ChangeBatch, Timestamp as TimelyTimestamp};
17use tokio::sync::mpsc::UnboundedSender;
18use tokio::sync::mpsc::error::SendError;
19
20pub type ChangeTx<T> = Arc<
21 dyn Fn(GlobalId, ChangeBatch<T>) -> Result<(), SendError<(GlobalId, ChangeBatch<T>)>>
22 + Send
23 + Sync,
24>;
25
26pub struct ReadHold<T: TimelyTimestamp> {
33 id: GlobalId,
35
36 since: Antichain<T>,
38
39 change_tx: ChangeTx<T>,
41}
42
43impl<T: TimelyTimestamp> Debug for ReadHold<T> {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 f.debug_struct("ReadHold")
46 .field("id", &self.id)
47 .field("since", &self.since)
48 .finish_non_exhaustive()
49 }
50}
51
52#[derive(Error, Debug)]
54pub enum ReadHoldDowngradeError<T> {
55 #[error("since violation: new frontier {frontier:?} is not beyond current since {since:?}")]
57 SinceViolation {
58 frontier: Antichain<T>,
60 since: Antichain<T>,
62 },
63}
64
65impl<T: TimelyTimestamp> ReadHold<T> {
66 pub fn new(id: GlobalId, since: Antichain<T>, change_tx: ChangeTx<T>) -> Self {
67 Self {
68 id,
69 since,
70 change_tx,
71 }
72 }
73
74 pub fn with_channel(
75 id: GlobalId,
76 since: Antichain<T>,
77 channel_tx: UnboundedSender<(GlobalId, ChangeBatch<T>)>,
78 ) -> Self {
79 let tx = Arc::new(move |id, changes| channel_tx.send((id, changes)));
80 Self::new(id, since, tx)
81 }
82
83 pub fn id(&self) -> GlobalId {
85 self.id
86 }
87
88 pub fn since(&self) -> &Antichain<T> {
93 &self.since
94 }
95
96 pub fn merge_assign(&mut self, mut other: ReadHold<T>) {
103 assert_eq!(
104 self.id, other.id,
105 "can only merge ReadHolds for the same ID"
106 );
107
108 let mut changes = ChangeBatch::new();
109 changes.extend(self.since.iter().map(|t| (t.clone(), -1)));
110 changes.extend(other.since.iter().map(|t| (t.clone(), -1)));
111
112 let other_since = std::mem::take(&mut other.since);
116
117 self.since.extend(other_since);
118
119 changes.extend(self.since.iter().map(|t| (t.clone(), 1)));
123
124 match (self.change_tx)(self.id, changes) {
125 Ok(_) => (),
126 Err(e) => {
127 panic!("cannot merge ReadHold: {}", e);
128 }
129 }
130 }
131
132 pub fn try_downgrade(
136 &mut self,
137 frontier: Antichain<T>,
138 ) -> Result<(), ReadHoldDowngradeError<T>> {
139 if PartialOrder::less_than(&frontier, &self.since) {
140 return Err(ReadHoldDowngradeError::SinceViolation {
141 frontier,
142 since: self.since.clone(),
143 });
144 }
145
146 let mut changes = ChangeBatch::new();
147
148 changes.extend(self.since.iter().map(|t| (t.clone(), -1)));
149 changes.extend(frontier.iter().map(|t| (t.clone(), 1)));
150 self.since = frontier;
151
152 if !changes.is_empty() {
153 let _ = (self.change_tx)(self.id, changes);
155 }
156
157 Ok(())
158 }
159
160 pub fn release(&mut self) {
162 self.try_downgrade(Antichain::new())
163 .expect("known to succeed");
164 }
165}
166
167impl<T: TimelyTimestamp> Clone for ReadHold<T> {
168 fn clone(&self) -> Self {
169 if self.id.is_user() {
170 tracing::trace!("cloning ReadHold on {}: {:?}", self.id, self.since);
171 }
172
173 let mut changes = ChangeBatch::new();
175
176 changes.extend(self.since.iter().map(|t| (t.clone(), 1)));
177
178 if !changes.is_empty() {
179 match (self.change_tx)(self.id.clone(), changes) {
182 Ok(_) => (),
183 Err(e) => {
184 panic!("cannot clone ReadHold: {}", e);
185 }
186 }
187 }
188
189 Self {
190 id: self.id.clone(),
191 since: self.since.clone(),
192 change_tx: Arc::clone(&self.change_tx),
193 }
194 }
195}
196
197impl<T: TimelyTimestamp> Drop for ReadHold<T> {
198 fn drop(&mut self) {
199 if self.id.is_user() {
200 tracing::trace!("dropping ReadHold on {}: {:?}", self.id, self.since);
201 }
202
203 self.release();
204 }
205}
206
207#[derive(Error, Debug)]
208pub enum ReadHoldError {
209 #[error("collection does not exist: {0}")]
210 CollectionMissing(GlobalId),
211 #[error("desired read hold frontier is not beyond the since of collection: {0}")]
212 SinceViolation(GlobalId),
213}