mz_storage/source/reclock/
compat.rs1use std::cell::RefCell;
14use std::rc::Rc;
15use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::Context;
19use differential_dataflow::lattice::Lattice;
20use fail::fail_point;
21use futures::StreamExt;
22use futures::stream::LocalBoxStream;
23use mz_ore::soft_panic_or_log;
24use mz_persist_client::Diagnostics;
25use mz_persist_client::cache::PersistClientCache;
26use mz_persist_client::error::UpperMismatch;
27use mz_persist_client::read::ListenEvent;
28use mz_persist_client::write::WriteHandle;
29use mz_persist_types::Codec64;
30use mz_persist_types::codec_impls::UnitSchema;
31use mz_repr::{Diff, GlobalId, RelationDesc};
32use mz_storage_client::util::remap_handle::{RemapHandle, RemapHandleReader};
33use mz_storage_types::StorageDiff;
34use mz_storage_types::controller::CollectionMetadata;
35use mz_storage_types::sources::{SourceData, SourceTimestamp};
36use timely::order::{PartialOrder, TotalOrder};
37use timely::progress::Timestamp;
38use timely::progress::frontier::Antichain;
39use tokio::sync::watch;
40
41pub struct PersistHandle<FromTime: SourceTimestamp, IntoTime: Timestamp + Lattice + Codec64> {
43 events:
44 LocalBoxStream<'static, ListenEvent<IntoTime, ((SourceData, ()), IntoTime, StorageDiff)>>,
45 write_handle: WriteHandle<SourceData, (), IntoTime, StorageDiff>,
46 read_only_rx: watch::Receiver<bool>,
48 pending_batch: Vec<(FromTime, IntoTime, Diff)>,
49 shared_write_frontier: Rc<RefCell<Antichain<IntoTime>>>,
51}
52
53impl<FromTime: Timestamp, IntoTime: Timestamp + Sync> PersistHandle<FromTime, IntoTime>
54where
55 FromTime: SourceTimestamp,
56 IntoTime: Timestamp + TotalOrder + Lattice + Codec64,
57{
58 pub async fn new(
59 persist_clients: Arc<PersistClientCache>,
60 read_only_rx: watch::Receiver<bool>,
61 remap_metadata: CollectionMetadata,
62 as_of: Antichain<IntoTime>,
63 shared_write_frontier: Rc<RefCell<Antichain<IntoTime>>>,
64 id: GlobalId,
66 operator: &str,
67 worker_id: usize,
68 worker_count: usize,
69 remap_relation_desc: RelationDesc,
76 remap_collection_id: GlobalId,
77 ) -> anyhow::Result<Self> {
78 let persist_client = persist_clients
79 .open(remap_metadata.persist_location.clone())
80 .await
81 .context("error creating persist client")?;
82
83 let (write_handle, mut read_handle) = persist_client
84 .open(
85 remap_metadata.data_shard,
86 Arc::new(remap_relation_desc),
87 Arc::new(UnitSchema),
88 Diagnostics {
89 shard_name: remap_collection_id.to_string(),
90 handle_purpose: format!("reclock for {}", id),
91 },
92 false,
93 )
94 .await
95 .expect("invalid usage");
96
97 let upper = write_handle.upper();
98 let since = read_handle.since();
104
105 fail_point!("invalid_remap_as_of");
108
109 if since.is_empty() {
110 tracing::info!(
120 source_id = %id,
121 %worker_id,
122 "since of remap shard is the empty antichain, suspending...");
123
124 tokio::time::sleep(Duration::from_secs(5 * 60 * 60)).await;
127
128 soft_panic_or_log!(
131 "since of remap shard is the empty antichain, source_id = {id}, worker_id = {worker_id}"
132 );
133 }
134
135 if !PartialOrder::less_equal(since, &as_of) {
136 anyhow::bail!(
137 "invalid as_of: as_of({as_of:?}) < since({since:?}), \
138 source {id}, \
139 remap_shard: {:?}",
140 remap_metadata.data_shard
141 );
142 }
143
144 assert!(
145 as_of.elements() == [IntoTime::minimum()] || PartialOrder::less_than(&as_of, upper),
146 "invalid as_of: upper({upper:?}) <= as_of({as_of:?})",
147 );
148
149 tracing::info!(
150 ?since,
151 ?as_of,
152 ?upper,
153 "{operator}({id}) {worker_id}/{worker_count} initializing PersistHandle"
154 );
155
156 use futures::stream;
157 let events = stream::once(async move {
158 let updates = read_handle
159 .snapshot_and_fetch(as_of.clone())
160 .await
161 .expect("since <= as_of asserted");
162 let snapshot = stream::once(std::future::ready(ListenEvent::Updates(updates)));
163
164 let listener = read_handle
165 .listen(as_of.clone())
166 .await
167 .expect("since <= as_of asserted");
168
169 let listen_stream = stream::unfold(listener, |mut listener| async move {
170 let events = stream::iter(listener.fetch_next().await);
171 Some((events, listener))
172 })
173 .flatten();
174
175 snapshot.chain(listen_stream)
176 })
177 .flatten()
178 .boxed_local();
179
180 Ok(Self {
181 events,
182 write_handle,
183 read_only_rx,
184 pending_batch: vec![],
185 shared_write_frontier,
186 })
187 }
188}
189
190#[async_trait::async_trait(?Send)]
191impl<FromTime, IntoTime> RemapHandleReader for PersistHandle<FromTime, IntoTime>
192where
193 FromTime: SourceTimestamp,
194 IntoTime: Timestamp + Lattice + Codec64,
195{
196 type FromTime = FromTime;
197 type IntoTime = IntoTime;
198
199 async fn next(
200 &mut self,
201 ) -> Option<(
202 Vec<(Self::FromTime, Self::IntoTime, Diff)>,
203 Antichain<Self::IntoTime>,
204 )> {
205 while let Some(event) = self.events.next().await {
206 match event {
207 ListenEvent::Progress(new_upper) => {
208 let batch = self
210 .pending_batch
211 .extract_if(.., |(_, ts, _)| !new_upper.less_equal(ts))
212 .collect();
213 return Some((batch, new_upper));
214 }
215 ListenEvent::Updates(msgs) => {
216 for ((update, _), into_ts, diff) in msgs {
217 let from_ts = FromTime::decode_row(&update.0.expect("invalid row"));
218 self.pending_batch.push((from_ts, into_ts, diff.into()));
219 }
220 }
221 }
222 }
223 None
224 }
225}
226
227#[async_trait::async_trait(?Send)]
228impl<FromTime, IntoTime> RemapHandle for PersistHandle<FromTime, IntoTime>
229where
230 FromTime: SourceTimestamp,
231 IntoTime: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
232{
233 async fn compare_and_append(
234 &mut self,
235 updates: Vec<(Self::FromTime, Self::IntoTime, Diff)>,
236 upper: Antichain<Self::IntoTime>,
237 new_upper: Antichain<Self::IntoTime>,
238 ) -> Result<(), UpperMismatch<Self::IntoTime>> {
239 if *self.read_only_rx.borrow() {
240 loop {
246 tracing::trace!(
247 ?upper,
248 ?new_upper,
249 persist_upper = ?self.write_handle.upper(),
250 "persist remap handle is in read-only mode, waiting until we come out of it or the shard upper advances");
251
252 let _ =
257 tokio::time::timeout(Duration::from_secs(1), self.read_only_rx.changed()).await;
258
259 if !*self.read_only_rx.borrow() {
260 tracing::trace!(
261 ?upper,
262 ?new_upper,
263 persist_upper = ?self.write_handle.upper(),
264 "persist remap handle has come out of read-only mode"
265 );
266
267 break;
269 }
270
271 let current_upper = self.write_handle.fetch_recent_upper().await;
272
273 if PartialOrder::less_than(&upper, current_upper) {
274 tracing::trace!(
275 ?upper,
276 ?new_upper,
277 persist_upper = ?current_upper,
278 "someone else advanced the upper, aborting write"
279 );
280
281 return Err(UpperMismatch {
282 current: current_upper.clone(),
283 expected: upper,
284 });
285 }
286 }
287 }
288
289 let row_updates = updates.into_iter().map(|(from_ts, into_ts, diff)| {
290 (
291 (SourceData(Ok(from_ts.encode_row())), ()),
292 into_ts,
293 diff.into_inner(),
294 )
295 });
296
297 match self
298 .write_handle
299 .compare_and_append(row_updates, upper, new_upper.clone())
300 .await
301 {
302 Ok(result) => {
303 *self.shared_write_frontier.borrow_mut() = new_upper;
304 return result;
305 }
306 Err(invalid_use) => panic!("compare_and_append failed: {invalid_use}"),
307 }
308 }
309
310 fn upper(&self) -> &Antichain<Self::IntoTime> {
311 self.write_handle.upper()
312 }
313}