1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! Reclocking compatibility code until the whole ingestion pipeline is transformed to native
11//! timestamps
1213use std::cell::RefCell;
14use std::rc::Rc;
15use std::sync::Arc;
16use std::time::Duration;
1718use anyhow::Context;
19use differential_dataflow::lattice::Lattice;
20use fail::fail_point;
21use futures::StreamExt;
22use futures::stream::LocalBoxStream;
23use mz_ore::soft_panic_or_log;
24use mz_ore::vec::VecExt;
25use mz_persist_client::Diagnostics;
26use mz_persist_client::cache::PersistClientCache;
27use mz_persist_client::error::UpperMismatch;
28use mz_persist_client::read::ListenEvent;
29use mz_persist_client::write::WriteHandle;
30use mz_persist_types::Codec64;
31use mz_persist_types::codec_impls::UnitSchema;
32use mz_repr::{Diff, GlobalId, RelationDesc};
33use mz_storage_client::util::remap_handle::{RemapHandle, RemapHandleReader};
34use mz_storage_types::StorageDiff;
35use mz_storage_types::controller::CollectionMetadata;
36use mz_storage_types::sources::{SourceData, SourceTimestamp};
37use timely::order::PartialOrder;
38use timely::progress::Timestamp;
39use timely::progress::frontier::Antichain;
40use tokio::sync::watch;
4142/// A handle to a persist shard that stores remap bindings
43pub struct PersistHandle<FromTime: SourceTimestamp, IntoTime: Timestamp + Lattice + Codec64> {
44 events: LocalBoxStream<
45'static,
46 ListenEvent<
47 IntoTime,
48 (
49 (Result<SourceData, String>, Result<(), String>),
50 IntoTime,
51 StorageDiff,
52 ),
53 >,
54 >,
55 write_handle: WriteHandle<SourceData, (), IntoTime, StorageDiff>,
56/// Whether or not this handle is in read-only mode.
57read_only_rx: watch::Receiver<bool>,
58 pending_batch: Vec<(FromTime, IntoTime, Diff)>,
59// Reports `self`'s write frontier.
60shared_write_frontier: Rc<RefCell<Antichain<IntoTime>>>,
61}
6263impl<FromTime: Timestamp, IntoTime: Timestamp + Sync> PersistHandle<FromTime, IntoTime>
64where
65FromTime: SourceTimestamp,
66 IntoTime: Timestamp + Lattice + Codec64,
67{
68pub async fn new(
69 persist_clients: Arc<PersistClientCache>,
70 read_only_rx: watch::Receiver<bool>,
71 metadata: CollectionMetadata,
72 as_of: Antichain<IntoTime>,
73 shared_write_frontier: Rc<RefCell<Antichain<IntoTime>>>,
74// additional information to improve logging
75id: GlobalId,
76 operator: &str,
77 worker_id: usize,
78 worker_count: usize,
79// Must match the `FromTime`. Ideally we would be able to discover this
80 // from `SourceTimestamp`, but each source would need a specific `SourceTimestamp`
81 // implementation, as they do not share remap `RelationDesc`'s (columns names
82 // are different).
83 //
84 // TODO(guswynn): use the type-system to prevent misuse here.
85remap_relation_desc: RelationDesc,
86 remap_collection_id: GlobalId,
87 ) -> anyhow::Result<Self> {
88let remap_shard = if let Some(remap_shard) = metadata.remap_shard {
89 remap_shard
90 } else {
91panic!(
92"cannot create remap PersistHandle for collection without remap shard: {id}, metadata: {:?}",
93 metadata
94 );
95 };
9697let persist_client = persist_clients
98 .open(metadata.persist_location.clone())
99 .await
100.context("error creating persist client")?;
101102let (write_handle, mut read_handle) = persist_client
103 .open(
104 remap_shard,
105 Arc::new(remap_relation_desc),
106 Arc::new(UnitSchema),
107 Diagnostics {
108 shard_name: remap_collection_id.to_string(),
109 handle_purpose: format!("reclock for {}", id),
110 },
111false,
112 )
113 .await
114.expect("invalid usage");
115116let upper = write_handle.upper();
117// We want a leased reader because elsewhere in the code the `as_of`
118 // time may also be determined by another `ReadHandle`, and the pair of
119 // them offer the invariant that we need (that the `as_of` if <= this
120 // `since`). Using a `SinceHandle` here does not offer the same
121 // invariant when paired with a `ReadHandle`.
122let since = read_handle.since();
123124// Allow manually simulating the scenario where the since of the remap
125 // shard has advanced too far.
126fail_point!("invalid_remap_as_of");
127128if since.is_empty() {
129// This can happen when, say, a source is being dropped but we on
130 // the cluster are busy and notice that only later. In those cases
131 // it can happen that we still try to render an ingestion that is
132 // not valid anymore and where the shards it uses are not valid to
133 // use anymore.
134 //
135 // This is a rare race condition and something that is expected to
136 // happen every now and then. It's not a bug in the current way of
137 // how things work.
138tracing::info!(
139 source_id = %id,
140 %worker_id,
141"since of remap shard is the empty antichain, suspending...");
142143// We wait 5 hours to give the commands a chance to arrive at this
144 // replica and for it to drop our dataflow.
145tokio::time::sleep(Duration::from_secs(5 * 60 * 60)).await;
146147// If we're still here after 5 hours, something has gone wrong and
148 // we complain.
149soft_panic_or_log!(
150"since of remap shard is the empty antichain, source_id = {id}, worker_id = {worker_id}"
151);
152 }
153154if !PartialOrder::less_equal(since, &as_of) {
155anyhow::bail!(
156"invalid as_of: as_of({as_of:?}) < since({since:?}), \
157 source {id}, \
158 remap_shard: {:?}",
159 metadata.remap_shard
160 );
161 }
162163assert!(
164 as_of.elements() == [IntoTime::minimum()] || PartialOrder::less_than(&as_of, upper),
165"invalid as_of: upper({upper:?}) <= as_of({as_of:?})",
166 );
167168tracing::info!(
169?since,
170?as_of,
171?upper,
172"{operator}({id}) {worker_id}/{worker_count} initializing PersistHandle"
173);
174175use futures::stream;
176let events = stream::once(async move {
177let updates = read_handle
178 .snapshot_and_fetch(as_of.clone())
179 .await
180.expect("since <= as_of asserted");
181let snapshot = stream::once(std::future::ready(ListenEvent::Updates(updates)));
182183let listener = read_handle
184 .listen(as_of.clone())
185 .await
186.expect("since <= as_of asserted");
187188let listen_stream = stream::unfold(listener, |mut listener| async move {
189let events = stream::iter(listener.fetch_next().await);
190Some((events, listener))
191 })
192 .flatten();
193194 snapshot.chain(listen_stream)
195 })
196 .flatten()
197 .boxed_local();
198199Ok(Self {
200 events,
201 write_handle,
202 read_only_rx,
203 pending_batch: vec![],
204 shared_write_frontier,
205 })
206 }
207}
208209#[async_trait::async_trait(?Send)]
210impl<FromTime, IntoTime> RemapHandleReader for PersistHandle<FromTime, IntoTime>
211where
212FromTime: SourceTimestamp,
213 IntoTime: Timestamp + Lattice + Codec64,
214{
215type FromTime = FromTime;
216type IntoTime = IntoTime;
217218async fn next(
219&mut self,
220 ) -> Option<(
221 Vec<(Self::FromTime, Self::IntoTime, Diff)>,
222 Antichain<Self::IntoTime>,
223 )> {
224while let Some(event) = self.events.next().await {
225match event {
226 ListenEvent::Progress(new_upper) => {
227// Peel off a batch of pending data
228let batch = self
229.pending_batch
230 .drain_filter_swapping(|(_, ts, _)| !new_upper.less_equal(ts))
231 .collect();
232return Some((batch, new_upper));
233 }
234 ListenEvent::Updates(msgs) => {
235for ((update, _), into_ts, diff) in msgs {
236let from_ts = FromTime::decode_row(
237&update.expect("invalid row").0.expect("invalid row"),
238 );
239self.pending_batch.push((from_ts, into_ts, diff.into()));
240 }
241 }
242 }
243 }
244None
245}
246}
247248#[async_trait::async_trait(?Send)]
249impl<FromTime, IntoTime> RemapHandle for PersistHandle<FromTime, IntoTime>
250where
251FromTime: SourceTimestamp,
252 IntoTime: Timestamp + Lattice + Codec64 + Sync,
253{
254async fn compare_and_append(
255&mut self,
256 updates: Vec<(Self::FromTime, Self::IntoTime, Diff)>,
257 upper: Antichain<Self::IntoTime>,
258 new_upper: Antichain<Self::IntoTime>,
259 ) -> Result<(), UpperMismatch<Self::IntoTime>> {
260if *self.read_only_rx.borrow() {
261// We have to wait for either us coming out of read-only mode or
262 // someone else advancing the upper. If we just returned an
263 // `UpperMismatch` while in read-only mode, we would go into a busy
264 // loop because we'd be called over and over again. One presumes.
265266loop {
267tracing::trace!(
268?upper,
269?new_upper,
270 persist_upper = ?self.write_handle.upper(),
271"persist remap handle is in read-only mode, waiting until we come out of it or the shard upper advances");
272273// We don't try to be too smart here, and for example use
274 // `wait_for_upper_past()`. We'd have to use a select!, which
275 // would require cancel safety of `wait_for_upper_past()`, which
276 // it doesn't advertise.
277let _ =
278 tokio::time::timeout(Duration::from_secs(1), self.read_only_rx.changed()).await;
279280if !*self.read_only_rx.borrow() {
281tracing::trace!(
282?upper,
283?new_upper,
284 persist_upper = ?self.write_handle.upper(),
285"persist remap handle has come out of read-only mode"
286);
287288// It's okay to write now.
289break;
290 }
291292let current_upper = self.write_handle.fetch_recent_upper().await;
293294if PartialOrder::less_than(&upper, current_upper) {
295tracing::trace!(
296?upper,
297?new_upper,
298 persist_upper = ?current_upper,
299"someone else advanced the upper, aborting write"
300);
301302return Err(UpperMismatch {
303 current: current_upper.clone(),
304 expected: upper,
305 });
306 }
307 }
308 }
309310let row_updates = updates.into_iter().map(|(from_ts, into_ts, diff)| {
311 (
312 (SourceData(Ok(from_ts.encode_row())), ()),
313 into_ts,
314 diff.into_inner(),
315 )
316 });
317318match self
319.write_handle
320 .compare_and_append(row_updates, upper, new_upper.clone())
321 .await
322{
323Ok(result) => {
324*self.shared_write_frontier.borrow_mut() = new_upper;
325return result;
326 }
327Err(invalid_use) => panic!("compare_and_append failed: {invalid_use}"),
328 }
329 }
330331fn upper(&self) -> &Antichain<Self::IntoTime> {
332self.write_handle.upper()
333 }
334}