mz_storage/source/reclock/
compat.rs1use std::cell::RefCell;
14use std::rc::Rc;
15use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::Context;
19use differential_dataflow::lattice::Lattice;
20use fail::fail_point;
21use futures::StreamExt;
22use futures::stream::LocalBoxStream;
23use mz_ore::soft_panic_or_log;
24use mz_persist_client::Diagnostics;
25use mz_persist_client::cache::PersistClientCache;
26use mz_persist_client::error::UpperMismatch;
27use mz_persist_client::read::ListenEvent;
28use mz_persist_client::write::WriteHandle;
29use mz_persist_types::Codec64;
30use mz_persist_types::codec_impls::UnitSchema;
31use mz_repr::{Diff, GlobalId, RelationDesc};
32use mz_storage_client::util::remap_handle::{RemapHandle, RemapHandleReader};
33use mz_storage_types::StorageDiff;
34use mz_storage_types::controller::CollectionMetadata;
35use mz_storage_types::sources::{SourceData, SourceTimestamp};
36use timely::order::{PartialOrder, TotalOrder};
37use timely::progress::Timestamp;
38use timely::progress::frontier::Antichain;
39use tokio::sync::watch;
40
41pub struct PersistHandle<FromTime: SourceTimestamp, IntoTime: Timestamp + Lattice + Codec64> {
43 events: LocalBoxStream<
44 'static,
45 ListenEvent<
46 IntoTime,
47 (
48 (Result<SourceData, String>, Result<(), String>),
49 IntoTime,
50 StorageDiff,
51 ),
52 >,
53 >,
54 write_handle: WriteHandle<SourceData, (), IntoTime, StorageDiff>,
55 read_only_rx: watch::Receiver<bool>,
57 pending_batch: Vec<(FromTime, IntoTime, Diff)>,
58 shared_write_frontier: Rc<RefCell<Antichain<IntoTime>>>,
60}
61
62impl<FromTime: Timestamp, IntoTime: Timestamp + Sync> PersistHandle<FromTime, IntoTime>
63where
64 FromTime: SourceTimestamp,
65 IntoTime: Timestamp + TotalOrder + Lattice + Codec64,
66{
67 pub async fn new(
68 persist_clients: Arc<PersistClientCache>,
69 read_only_rx: watch::Receiver<bool>,
70 metadata: CollectionMetadata,
71 as_of: Antichain<IntoTime>,
72 shared_write_frontier: Rc<RefCell<Antichain<IntoTime>>>,
73 id: GlobalId,
75 operator: &str,
76 worker_id: usize,
77 worker_count: usize,
78 remap_relation_desc: RelationDesc,
85 remap_collection_id: GlobalId,
86 ) -> anyhow::Result<Self> {
87 let remap_shard = if let Some(remap_shard) = metadata.remap_shard {
88 remap_shard
89 } else {
90 panic!(
91 "cannot create remap PersistHandle for collection without remap shard: {id}, metadata: {:?}",
92 metadata
93 );
94 };
95
96 let persist_client = persist_clients
97 .open(metadata.persist_location.clone())
98 .await
99 .context("error creating persist client")?;
100
101 let (write_handle, mut read_handle) = persist_client
102 .open(
103 remap_shard,
104 Arc::new(remap_relation_desc),
105 Arc::new(UnitSchema),
106 Diagnostics {
107 shard_name: remap_collection_id.to_string(),
108 handle_purpose: format!("reclock for {}", id),
109 },
110 false,
111 )
112 .await
113 .expect("invalid usage");
114
115 let upper = write_handle.upper();
116 let since = read_handle.since();
122
123 fail_point!("invalid_remap_as_of");
126
127 if since.is_empty() {
128 tracing::info!(
138 source_id = %id,
139 %worker_id,
140 "since of remap shard is the empty antichain, suspending...");
141
142 tokio::time::sleep(Duration::from_secs(5 * 60 * 60)).await;
145
146 soft_panic_or_log!(
149 "since of remap shard is the empty antichain, source_id = {id}, worker_id = {worker_id}"
150 );
151 }
152
153 if !PartialOrder::less_equal(since, &as_of) {
154 anyhow::bail!(
155 "invalid as_of: as_of({as_of:?}) < since({since:?}), \
156 source {id}, \
157 remap_shard: {:?}",
158 metadata.remap_shard
159 );
160 }
161
162 assert!(
163 as_of.elements() == [IntoTime::minimum()] || PartialOrder::less_than(&as_of, upper),
164 "invalid as_of: upper({upper:?}) <= as_of({as_of:?})",
165 );
166
167 tracing::info!(
168 ?since,
169 ?as_of,
170 ?upper,
171 "{operator}({id}) {worker_id}/{worker_count} initializing PersistHandle"
172 );
173
174 use futures::stream;
175 let events = stream::once(async move {
176 let updates = read_handle
177 .snapshot_and_fetch(as_of.clone())
178 .await
179 .expect("since <= as_of asserted");
180 let snapshot = stream::once(std::future::ready(ListenEvent::Updates(updates)));
181
182 let listener = read_handle
183 .listen(as_of.clone())
184 .await
185 .expect("since <= as_of asserted");
186
187 let listen_stream = stream::unfold(listener, |mut listener| async move {
188 let events = stream::iter(listener.fetch_next().await);
189 Some((events, listener))
190 })
191 .flatten();
192
193 snapshot.chain(listen_stream)
194 })
195 .flatten()
196 .boxed_local();
197
198 Ok(Self {
199 events,
200 write_handle,
201 read_only_rx,
202 pending_batch: vec![],
203 shared_write_frontier,
204 })
205 }
206}
207
208#[async_trait::async_trait(?Send)]
209impl<FromTime, IntoTime> RemapHandleReader for PersistHandle<FromTime, IntoTime>
210where
211 FromTime: SourceTimestamp,
212 IntoTime: Timestamp + Lattice + Codec64,
213{
214 type FromTime = FromTime;
215 type IntoTime = IntoTime;
216
217 async fn next(
218 &mut self,
219 ) -> Option<(
220 Vec<(Self::FromTime, Self::IntoTime, Diff)>,
221 Antichain<Self::IntoTime>,
222 )> {
223 while let Some(event) = self.events.next().await {
224 match event {
225 ListenEvent::Progress(new_upper) => {
226 let batch = self
228 .pending_batch
229 .extract_if(.., |(_, ts, _)| !new_upper.less_equal(ts))
230 .collect();
231 return Some((batch, new_upper));
232 }
233 ListenEvent::Updates(msgs) => {
234 for ((update, _), into_ts, diff) in msgs {
235 let from_ts = FromTime::decode_row(
236 &update.expect("invalid row").0.expect("invalid row"),
237 );
238 self.pending_batch.push((from_ts, into_ts, diff.into()));
239 }
240 }
241 }
242 }
243 None
244 }
245}
246
247#[async_trait::async_trait(?Send)]
248impl<FromTime, IntoTime> RemapHandle for PersistHandle<FromTime, IntoTime>
249where
250 FromTime: SourceTimestamp,
251 IntoTime: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
252{
253 async fn compare_and_append(
254 &mut self,
255 updates: Vec<(Self::FromTime, Self::IntoTime, Diff)>,
256 upper: Antichain<Self::IntoTime>,
257 new_upper: Antichain<Self::IntoTime>,
258 ) -> Result<(), UpperMismatch<Self::IntoTime>> {
259 if *self.read_only_rx.borrow() {
260 loop {
266 tracing::trace!(
267 ?upper,
268 ?new_upper,
269 persist_upper = ?self.write_handle.upper(),
270 "persist remap handle is in read-only mode, waiting until we come out of it or the shard upper advances");
271
272 let _ =
277 tokio::time::timeout(Duration::from_secs(1), self.read_only_rx.changed()).await;
278
279 if !*self.read_only_rx.borrow() {
280 tracing::trace!(
281 ?upper,
282 ?new_upper,
283 persist_upper = ?self.write_handle.upper(),
284 "persist remap handle has come out of read-only mode"
285 );
286
287 break;
289 }
290
291 let current_upper = self.write_handle.fetch_recent_upper().await;
292
293 if PartialOrder::less_than(&upper, current_upper) {
294 tracing::trace!(
295 ?upper,
296 ?new_upper,
297 persist_upper = ?current_upper,
298 "someone else advanced the upper, aborting write"
299 );
300
301 return Err(UpperMismatch {
302 current: current_upper.clone(),
303 expected: upper,
304 });
305 }
306 }
307 }
308
309 let row_updates = updates.into_iter().map(|(from_ts, into_ts, diff)| {
310 (
311 (SourceData(Ok(from_ts.encode_row())), ()),
312 into_ts,
313 diff.into_inner(),
314 )
315 });
316
317 match self
318 .write_handle
319 .compare_and_append(row_updates, upper, new_upper.clone())
320 .await
321 {
322 Ok(result) => {
323 *self.shared_write_frontier.borrow_mut() = new_upper;
324 return result;
325 }
326 Err(invalid_use) => panic!("compare_and_append failed: {invalid_use}"),
327 }
328 }
329
330 fn upper(&self) -> &Antichain<Self::IntoTime> {
331 self.write_handle.upper()
332 }
333}