mz_storage/source/reclock/
compat.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reclocking compatibility code until the whole ingestion pipeline is transformed to native
11//! timestamps
12
13use std::cell::RefCell;
14use std::rc::Rc;
15use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::Context;
19use differential_dataflow::lattice::Lattice;
20use fail::fail_point;
21use futures::StreamExt;
22use futures::stream::LocalBoxStream;
23use mz_ore::soft_panic_or_log;
24use mz_persist_client::Diagnostics;
25use mz_persist_client::cache::PersistClientCache;
26use mz_persist_client::error::UpperMismatch;
27use mz_persist_client::read::ListenEvent;
28use mz_persist_client::write::WriteHandle;
29use mz_persist_types::Codec64;
30use mz_persist_types::codec_impls::UnitSchema;
31use mz_repr::{Diff, GlobalId, RelationDesc};
32use mz_storage_client::util::remap_handle::{RemapHandle, RemapHandleReader};
33use mz_storage_types::StorageDiff;
34use mz_storage_types::controller::CollectionMetadata;
35use mz_storage_types::sources::{SourceData, SourceTimestamp};
36use timely::order::{PartialOrder, TotalOrder};
37use timely::progress::Timestamp;
38use timely::progress::frontier::Antichain;
39use tokio::sync::watch;
40
41/// A handle to a persist shard that stores remap bindings
42pub struct PersistHandle<FromTime: SourceTimestamp, IntoTime: Timestamp + Lattice + Codec64> {
43    events: LocalBoxStream<
44        'static,
45        ListenEvent<
46            IntoTime,
47            (
48                (Result<SourceData, String>, Result<(), String>),
49                IntoTime,
50                StorageDiff,
51            ),
52        >,
53    >,
54    write_handle: WriteHandle<SourceData, (), IntoTime, StorageDiff>,
55    /// Whether or not this handle is in read-only mode.
56    read_only_rx: watch::Receiver<bool>,
57    pending_batch: Vec<(FromTime, IntoTime, Diff)>,
58    // Reports `self`'s write frontier.
59    shared_write_frontier: Rc<RefCell<Antichain<IntoTime>>>,
60}
61
62impl<FromTime: Timestamp, IntoTime: Timestamp + Sync> PersistHandle<FromTime, IntoTime>
63where
64    FromTime: SourceTimestamp,
65    IntoTime: Timestamp + TotalOrder + Lattice + Codec64,
66{
67    pub async fn new(
68        persist_clients: Arc<PersistClientCache>,
69        read_only_rx: watch::Receiver<bool>,
70        remap_metadata: CollectionMetadata,
71        as_of: Antichain<IntoTime>,
72        shared_write_frontier: Rc<RefCell<Antichain<IntoTime>>>,
73        // additional information to improve logging
74        id: GlobalId,
75        operator: &str,
76        worker_id: usize,
77        worker_count: usize,
78        // Must match the `FromTime`. Ideally we would be able to discover this
79        // from `SourceTimestamp`, but each source would need a specific `SourceTimestamp`
80        // implementation, as they do not share remap `RelationDesc`'s (columns names
81        // are different).
82        //
83        // TODO(guswynn): use the type-system to prevent misuse here.
84        remap_relation_desc: RelationDesc,
85        remap_collection_id: GlobalId,
86    ) -> anyhow::Result<Self> {
87        let persist_client = persist_clients
88            .open(remap_metadata.persist_location.clone())
89            .await
90            .context("error creating persist client")?;
91
92        let (write_handle, mut read_handle) = persist_client
93            .open(
94                remap_metadata.data_shard,
95                Arc::new(remap_relation_desc),
96                Arc::new(UnitSchema),
97                Diagnostics {
98                    shard_name: remap_collection_id.to_string(),
99                    handle_purpose: format!("reclock for {}", id),
100                },
101                false,
102            )
103            .await
104            .expect("invalid usage");
105
106        let upper = write_handle.upper();
107        // We want a leased reader because elsewhere in the code the `as_of`
108        // time may also be determined by another `ReadHandle`, and the pair of
109        // them offer the invariant that we need (that the `as_of` if <= this
110        // `since`). Using a `SinceHandle` here does not offer the same
111        // invariant when paired with a `ReadHandle`.
112        let since = read_handle.since();
113
114        // Allow manually simulating the scenario where the since of the remap
115        // shard has advanced too far.
116        fail_point!("invalid_remap_as_of");
117
118        if since.is_empty() {
119            // This can happen when, say, a source is being dropped but we on
120            // the cluster are busy and notice that only later. In those cases
121            // it can happen that we still try to render an ingestion that is
122            // not valid anymore and where the shards it uses are not valid to
123            // use anymore.
124            //
125            // This is a rare race condition and something that is expected to
126            // happen every now and then. It's not a bug in the current way of
127            // how things work.
128            tracing::info!(
129                source_id = %id,
130                %worker_id,
131                "since of remap shard is the empty antichain, suspending...");
132
133            // We wait 5 hours to give the commands a chance to arrive at this
134            // replica and for it to drop our dataflow.
135            tokio::time::sleep(Duration::from_secs(5 * 60 * 60)).await;
136
137            // If we're still here after 5 hours, something has gone wrong and
138            // we complain.
139            soft_panic_or_log!(
140                "since of remap shard is the empty antichain, source_id = {id}, worker_id = {worker_id}"
141            );
142        }
143
144        if !PartialOrder::less_equal(since, &as_of) {
145            anyhow::bail!(
146                "invalid as_of: as_of({as_of:?}) < since({since:?}), \
147                source {id}, \
148                remap_shard: {:?}",
149                remap_metadata.data_shard
150            );
151        }
152
153        assert!(
154            as_of.elements() == [IntoTime::minimum()] || PartialOrder::less_than(&as_of, upper),
155            "invalid as_of: upper({upper:?}) <= as_of({as_of:?})",
156        );
157
158        tracing::info!(
159            ?since,
160            ?as_of,
161            ?upper,
162            "{operator}({id}) {worker_id}/{worker_count} initializing PersistHandle"
163        );
164
165        use futures::stream;
166        let events = stream::once(async move {
167            let updates = read_handle
168                .snapshot_and_fetch(as_of.clone())
169                .await
170                .expect("since <= as_of asserted");
171            let snapshot = stream::once(std::future::ready(ListenEvent::Updates(updates)));
172
173            let listener = read_handle
174                .listen(as_of.clone())
175                .await
176                .expect("since <= as_of asserted");
177
178            let listen_stream = stream::unfold(listener, |mut listener| async move {
179                let events = stream::iter(listener.fetch_next().await);
180                Some((events, listener))
181            })
182            .flatten();
183
184            snapshot.chain(listen_stream)
185        })
186        .flatten()
187        .boxed_local();
188
189        Ok(Self {
190            events,
191            write_handle,
192            read_only_rx,
193            pending_batch: vec![],
194            shared_write_frontier,
195        })
196    }
197}
198
199#[async_trait::async_trait(?Send)]
200impl<FromTime, IntoTime> RemapHandleReader for PersistHandle<FromTime, IntoTime>
201where
202    FromTime: SourceTimestamp,
203    IntoTime: Timestamp + Lattice + Codec64,
204{
205    type FromTime = FromTime;
206    type IntoTime = IntoTime;
207
208    async fn next(
209        &mut self,
210    ) -> Option<(
211        Vec<(Self::FromTime, Self::IntoTime, Diff)>,
212        Antichain<Self::IntoTime>,
213    )> {
214        while let Some(event) = self.events.next().await {
215            match event {
216                ListenEvent::Progress(new_upper) => {
217                    // Peel off a batch of pending data
218                    let batch = self
219                        .pending_batch
220                        .extract_if(.., |(_, ts, _)| !new_upper.less_equal(ts))
221                        .collect();
222                    return Some((batch, new_upper));
223                }
224                ListenEvent::Updates(msgs) => {
225                    for ((update, _), into_ts, diff) in msgs {
226                        let from_ts = FromTime::decode_row(
227                            &update.expect("invalid row").0.expect("invalid row"),
228                        );
229                        self.pending_batch.push((from_ts, into_ts, diff.into()));
230                    }
231                }
232            }
233        }
234        None
235    }
236}
237
238#[async_trait::async_trait(?Send)]
239impl<FromTime, IntoTime> RemapHandle for PersistHandle<FromTime, IntoTime>
240where
241    FromTime: SourceTimestamp,
242    IntoTime: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
243{
244    async fn compare_and_append(
245        &mut self,
246        updates: Vec<(Self::FromTime, Self::IntoTime, Diff)>,
247        upper: Antichain<Self::IntoTime>,
248        new_upper: Antichain<Self::IntoTime>,
249    ) -> Result<(), UpperMismatch<Self::IntoTime>> {
250        if *self.read_only_rx.borrow() {
251            // We have to wait for either us coming out of read-only mode or
252            // someone else advancing the upper. If we just returned an
253            // `UpperMismatch` while in read-only mode, we would go into a busy
254            // loop because we'd be called over and over again. One presumes.
255
256            loop {
257                tracing::trace!(
258                    ?upper,
259                    ?new_upper,
260                    persist_upper = ?self.write_handle.upper(),
261                    "persist remap handle is in read-only mode, waiting until we come out of it or the shard upper advances");
262
263                // We don't try to be too smart here, and for example use
264                // `wait_for_upper_past()`. We'd have to use a select!, which
265                // would require cancel safety of `wait_for_upper_past()`, which
266                // it doesn't advertise.
267                let _ =
268                    tokio::time::timeout(Duration::from_secs(1), self.read_only_rx.changed()).await;
269
270                if !*self.read_only_rx.borrow() {
271                    tracing::trace!(
272                        ?upper,
273                        ?new_upper,
274                        persist_upper = ?self.write_handle.upper(),
275                        "persist remap handle has come out of read-only mode"
276                    );
277
278                    // It's okay to write now.
279                    break;
280                }
281
282                let current_upper = self.write_handle.fetch_recent_upper().await;
283
284                if PartialOrder::less_than(&upper, current_upper) {
285                    tracing::trace!(
286                        ?upper,
287                        ?new_upper,
288                        persist_upper = ?current_upper,
289                        "someone else advanced the upper, aborting write"
290                    );
291
292                    return Err(UpperMismatch {
293                        current: current_upper.clone(),
294                        expected: upper,
295                    });
296                }
297            }
298        }
299
300        let row_updates = updates.into_iter().map(|(from_ts, into_ts, diff)| {
301            (
302                (SourceData(Ok(from_ts.encode_row())), ()),
303                into_ts,
304                diff.into_inner(),
305            )
306        });
307
308        match self
309            .write_handle
310            .compare_and_append(row_updates, upper, new_upper.clone())
311            .await
312        {
313            Ok(result) => {
314                *self.shared_write_frontier.borrow_mut() = new_upper;
315                return result;
316            }
317            Err(invalid_use) => panic!("compare_and_append failed: {invalid_use}"),
318        }
319    }
320
321    fn upper(&self) -> &Antichain<Self::IntoTime> {
322        self.write_handle.upper()
323    }
324}