mz_storage/source/reclock/
compat.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reclocking compatibility code until the whole ingestion pipeline is transformed to native
11//! timestamps
12
13use std::cell::RefCell;
14use std::rc::Rc;
15use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::Context;
19use differential_dataflow::lattice::Lattice;
20use fail::fail_point;
21use futures::StreamExt;
22use futures::stream::LocalBoxStream;
23use mz_ore::soft_panic_or_log;
24use mz_persist_client::Diagnostics;
25use mz_persist_client::cache::PersistClientCache;
26use mz_persist_client::error::UpperMismatch;
27use mz_persist_client::read::ListenEvent;
28use mz_persist_client::write::WriteHandle;
29use mz_persist_types::Codec64;
30use mz_persist_types::codec_impls::UnitSchema;
31use mz_repr::{Diff, GlobalId, RelationDesc};
32use mz_storage_client::util::remap_handle::{RemapHandle, RemapHandleReader};
33use mz_storage_types::StorageDiff;
34use mz_storage_types::controller::CollectionMetadata;
35use mz_storage_types::sources::{SourceData, SourceTimestamp};
36use timely::order::{PartialOrder, TotalOrder};
37use timely::progress::Timestamp;
38use timely::progress::frontier::Antichain;
39use tokio::sync::watch;
40
41/// A handle to a persist shard that stores remap bindings
42pub struct PersistHandle<FromTime: SourceTimestamp, IntoTime: Timestamp + Lattice + Codec64> {
43    events: LocalBoxStream<
44        'static,
45        ListenEvent<
46            IntoTime,
47            (
48                (Result<SourceData, String>, Result<(), String>),
49                IntoTime,
50                StorageDiff,
51            ),
52        >,
53    >,
54    write_handle: WriteHandle<SourceData, (), IntoTime, StorageDiff>,
55    /// Whether or not this handle is in read-only mode.
56    read_only_rx: watch::Receiver<bool>,
57    pending_batch: Vec<(FromTime, IntoTime, Diff)>,
58    // Reports `self`'s write frontier.
59    shared_write_frontier: Rc<RefCell<Antichain<IntoTime>>>,
60}
61
62impl<FromTime: Timestamp, IntoTime: Timestamp + Sync> PersistHandle<FromTime, IntoTime>
63where
64    FromTime: SourceTimestamp,
65    IntoTime: Timestamp + TotalOrder + Lattice + Codec64,
66{
67    pub async fn new(
68        persist_clients: Arc<PersistClientCache>,
69        read_only_rx: watch::Receiver<bool>,
70        metadata: CollectionMetadata,
71        as_of: Antichain<IntoTime>,
72        shared_write_frontier: Rc<RefCell<Antichain<IntoTime>>>,
73        // additional information to improve logging
74        id: GlobalId,
75        operator: &str,
76        worker_id: usize,
77        worker_count: usize,
78        // Must match the `FromTime`. Ideally we would be able to discover this
79        // from `SourceTimestamp`, but each source would need a specific `SourceTimestamp`
80        // implementation, as they do not share remap `RelationDesc`'s (columns names
81        // are different).
82        //
83        // TODO(guswynn): use the type-system to prevent misuse here.
84        remap_relation_desc: RelationDesc,
85        remap_collection_id: GlobalId,
86    ) -> anyhow::Result<Self> {
87        let remap_shard = if let Some(remap_shard) = metadata.remap_shard {
88            remap_shard
89        } else {
90            panic!(
91                "cannot create remap PersistHandle for collection without remap shard: {id}, metadata: {:?}",
92                metadata
93            );
94        };
95
96        let persist_client = persist_clients
97            .open(metadata.persist_location.clone())
98            .await
99            .context("error creating persist client")?;
100
101        let (write_handle, mut read_handle) = persist_client
102            .open(
103                remap_shard,
104                Arc::new(remap_relation_desc),
105                Arc::new(UnitSchema),
106                Diagnostics {
107                    shard_name: remap_collection_id.to_string(),
108                    handle_purpose: format!("reclock for {}", id),
109                },
110                false,
111            )
112            .await
113            .expect("invalid usage");
114
115        let upper = write_handle.upper();
116        // We want a leased reader because elsewhere in the code the `as_of`
117        // time may also be determined by another `ReadHandle`, and the pair of
118        // them offer the invariant that we need (that the `as_of` if <= this
119        // `since`). Using a `SinceHandle` here does not offer the same
120        // invariant when paired with a `ReadHandle`.
121        let since = read_handle.since();
122
123        // Allow manually simulating the scenario where the since of the remap
124        // shard has advanced too far.
125        fail_point!("invalid_remap_as_of");
126
127        if since.is_empty() {
128            // This can happen when, say, a source is being dropped but we on
129            // the cluster are busy and notice that only later. In those cases
130            // it can happen that we still try to render an ingestion that is
131            // not valid anymore and where the shards it uses are not valid to
132            // use anymore.
133            //
134            // This is a rare race condition and something that is expected to
135            // happen every now and then. It's not a bug in the current way of
136            // how things work.
137            tracing::info!(
138                source_id = %id,
139                %worker_id,
140                "since of remap shard is the empty antichain, suspending...");
141
142            // We wait 5 hours to give the commands a chance to arrive at this
143            // replica and for it to drop our dataflow.
144            tokio::time::sleep(Duration::from_secs(5 * 60 * 60)).await;
145
146            // If we're still here after 5 hours, something has gone wrong and
147            // we complain.
148            soft_panic_or_log!(
149                "since of remap shard is the empty antichain, source_id = {id}, worker_id = {worker_id}"
150            );
151        }
152
153        if !PartialOrder::less_equal(since, &as_of) {
154            anyhow::bail!(
155                "invalid as_of: as_of({as_of:?}) < since({since:?}), \
156                source {id}, \
157                remap_shard: {:?}",
158                metadata.remap_shard
159            );
160        }
161
162        assert!(
163            as_of.elements() == [IntoTime::minimum()] || PartialOrder::less_than(&as_of, upper),
164            "invalid as_of: upper({upper:?}) <= as_of({as_of:?})",
165        );
166
167        tracing::info!(
168            ?since,
169            ?as_of,
170            ?upper,
171            "{operator}({id}) {worker_id}/{worker_count} initializing PersistHandle"
172        );
173
174        use futures::stream;
175        let events = stream::once(async move {
176            let updates = read_handle
177                .snapshot_and_fetch(as_of.clone())
178                .await
179                .expect("since <= as_of asserted");
180            let snapshot = stream::once(std::future::ready(ListenEvent::Updates(updates)));
181
182            let listener = read_handle
183                .listen(as_of.clone())
184                .await
185                .expect("since <= as_of asserted");
186
187            let listen_stream = stream::unfold(listener, |mut listener| async move {
188                let events = stream::iter(listener.fetch_next().await);
189                Some((events, listener))
190            })
191            .flatten();
192
193            snapshot.chain(listen_stream)
194        })
195        .flatten()
196        .boxed_local();
197
198        Ok(Self {
199            events,
200            write_handle,
201            read_only_rx,
202            pending_batch: vec![],
203            shared_write_frontier,
204        })
205    }
206}
207
208#[async_trait::async_trait(?Send)]
209impl<FromTime, IntoTime> RemapHandleReader for PersistHandle<FromTime, IntoTime>
210where
211    FromTime: SourceTimestamp,
212    IntoTime: Timestamp + Lattice + Codec64,
213{
214    type FromTime = FromTime;
215    type IntoTime = IntoTime;
216
217    async fn next(
218        &mut self,
219    ) -> Option<(
220        Vec<(Self::FromTime, Self::IntoTime, Diff)>,
221        Antichain<Self::IntoTime>,
222    )> {
223        while let Some(event) = self.events.next().await {
224            match event {
225                ListenEvent::Progress(new_upper) => {
226                    // Peel off a batch of pending data
227                    let batch = self
228                        .pending_batch
229                        .extract_if(.., |(_, ts, _)| !new_upper.less_equal(ts))
230                        .collect();
231                    return Some((batch, new_upper));
232                }
233                ListenEvent::Updates(msgs) => {
234                    for ((update, _), into_ts, diff) in msgs {
235                        let from_ts = FromTime::decode_row(
236                            &update.expect("invalid row").0.expect("invalid row"),
237                        );
238                        self.pending_batch.push((from_ts, into_ts, diff.into()));
239                    }
240                }
241            }
242        }
243        None
244    }
245}
246
247#[async_trait::async_trait(?Send)]
248impl<FromTime, IntoTime> RemapHandle for PersistHandle<FromTime, IntoTime>
249where
250    FromTime: SourceTimestamp,
251    IntoTime: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
252{
253    async fn compare_and_append(
254        &mut self,
255        updates: Vec<(Self::FromTime, Self::IntoTime, Diff)>,
256        upper: Antichain<Self::IntoTime>,
257        new_upper: Antichain<Self::IntoTime>,
258    ) -> Result<(), UpperMismatch<Self::IntoTime>> {
259        if *self.read_only_rx.borrow() {
260            // We have to wait for either us coming out of read-only mode or
261            // someone else advancing the upper. If we just returned an
262            // `UpperMismatch` while in read-only mode, we would go into a busy
263            // loop because we'd be called over and over again. One presumes.
264
265            loop {
266                tracing::trace!(
267                    ?upper,
268                    ?new_upper,
269                    persist_upper = ?self.write_handle.upper(),
270                    "persist remap handle is in read-only mode, waiting until we come out of it or the shard upper advances");
271
272                // We don't try to be too smart here, and for example use
273                // `wait_for_upper_past()`. We'd have to use a select!, which
274                // would require cancel safety of `wait_for_upper_past()`, which
275                // it doesn't advertise.
276                let _ =
277                    tokio::time::timeout(Duration::from_secs(1), self.read_only_rx.changed()).await;
278
279                if !*self.read_only_rx.borrow() {
280                    tracing::trace!(
281                        ?upper,
282                        ?new_upper,
283                        persist_upper = ?self.write_handle.upper(),
284                        "persist remap handle has come out of read-only mode"
285                    );
286
287                    // It's okay to write now.
288                    break;
289                }
290
291                let current_upper = self.write_handle.fetch_recent_upper().await;
292
293                if PartialOrder::less_than(&upper, current_upper) {
294                    tracing::trace!(
295                        ?upper,
296                        ?new_upper,
297                        persist_upper = ?current_upper,
298                        "someone else advanced the upper, aborting write"
299                    );
300
301                    return Err(UpperMismatch {
302                        current: current_upper.clone(),
303                        expected: upper,
304                    });
305                }
306            }
307        }
308
309        let row_updates = updates.into_iter().map(|(from_ts, into_ts, diff)| {
310            (
311                (SourceData(Ok(from_ts.encode_row())), ()),
312                into_ts,
313                diff.into_inner(),
314            )
315        });
316
317        match self
318            .write_handle
319            .compare_and_append(row_updates, upper, new_upper.clone())
320            .await
321        {
322            Ok(result) => {
323                *self.shared_write_frontier.borrow_mut() = new_upper;
324                return result;
325            }
326            Err(invalid_use) => panic!("compare_and_append failed: {invalid_use}"),
327        }
328    }
329
330    fn upper(&self) -> &Antichain<Self::IntoTime> {
331        self.write_handle.upper()
332    }
333}