mz_storage/source/reclock/
compat.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reclocking compatibility code until the whole ingestion pipeline is transformed to native
11//! timestamps
12
13use std::cell::RefCell;
14use std::rc::Rc;
15use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::Context;
19use differential_dataflow::lattice::Lattice;
20use fail::fail_point;
21use futures::StreamExt;
22use futures::stream::LocalBoxStream;
23use mz_ore::soft_panic_or_log;
24use mz_persist_client::Diagnostics;
25use mz_persist_client::cache::PersistClientCache;
26use mz_persist_client::error::UpperMismatch;
27use mz_persist_client::read::ListenEvent;
28use mz_persist_client::write::WriteHandle;
29use mz_persist_types::Codec64;
30use mz_persist_types::codec_impls::UnitSchema;
31use mz_repr::{Diff, GlobalId, RelationDesc};
32use mz_storage_client::util::remap_handle::{RemapHandle, RemapHandleReader};
33use mz_storage_types::StorageDiff;
34use mz_storage_types::controller::CollectionMetadata;
35use mz_storage_types::sources::{SourceData, SourceTimestamp};
36use timely::order::{PartialOrder, TotalOrder};
37use timely::progress::Timestamp;
38use timely::progress::frontier::Antichain;
39use tokio::sync::watch;
40
41/// A handle to a persist shard that stores remap bindings
42pub struct PersistHandle<FromTime: SourceTimestamp, IntoTime: Timestamp + Lattice + Codec64> {
43    events:
44        LocalBoxStream<'static, ListenEvent<IntoTime, ((SourceData, ()), IntoTime, StorageDiff)>>,
45    write_handle: WriteHandle<SourceData, (), IntoTime, StorageDiff>,
46    /// Whether or not this handle is in read-only mode.
47    read_only_rx: watch::Receiver<bool>,
48    pending_batch: Vec<(FromTime, IntoTime, Diff)>,
49    // Reports `self`'s write frontier.
50    shared_write_frontier: Rc<RefCell<Antichain<IntoTime>>>,
51}
52
53impl<FromTime: Timestamp, IntoTime: Timestamp + Sync> PersistHandle<FromTime, IntoTime>
54where
55    FromTime: SourceTimestamp,
56    IntoTime: Timestamp + TotalOrder + Lattice + Codec64,
57{
58    pub async fn new(
59        persist_clients: Arc<PersistClientCache>,
60        read_only_rx: watch::Receiver<bool>,
61        remap_metadata: CollectionMetadata,
62        as_of: Antichain<IntoTime>,
63        shared_write_frontier: Rc<RefCell<Antichain<IntoTime>>>,
64        // additional information to improve logging
65        id: GlobalId,
66        operator: &str,
67        worker_id: usize,
68        worker_count: usize,
69        // Must match the `FromTime`. Ideally we would be able to discover this
70        // from `SourceTimestamp`, but each source would need a specific `SourceTimestamp`
71        // implementation, as they do not share remap `RelationDesc`'s (columns names
72        // are different).
73        //
74        // TODO(guswynn): use the type-system to prevent misuse here.
75        remap_relation_desc: RelationDesc,
76        remap_collection_id: GlobalId,
77    ) -> anyhow::Result<Self> {
78        let persist_client = persist_clients
79            .open(remap_metadata.persist_location.clone())
80            .await
81            .context("error creating persist client")?;
82
83        let (write_handle, mut read_handle) = persist_client
84            .open(
85                remap_metadata.data_shard,
86                Arc::new(remap_relation_desc),
87                Arc::new(UnitSchema),
88                Diagnostics {
89                    shard_name: remap_collection_id.to_string(),
90                    handle_purpose: format!("reclock for {}", id),
91                },
92                false,
93            )
94            .await
95            .expect("invalid usage");
96
97        let upper = write_handle.upper();
98        // We want a leased reader because elsewhere in the code the `as_of`
99        // time may also be determined by another `ReadHandle`, and the pair of
100        // them offer the invariant that we need (that the `as_of` if <= this
101        // `since`). Using a `SinceHandle` here does not offer the same
102        // invariant when paired with a `ReadHandle`.
103        let since = read_handle.since();
104
105        // Allow manually simulating the scenario where the since of the remap
106        // shard has advanced too far.
107        fail_point!("invalid_remap_as_of");
108
109        if since.is_empty() {
110            // This can happen when, say, a source is being dropped but we on
111            // the cluster are busy and notice that only later. In those cases
112            // it can happen that we still try to render an ingestion that is
113            // not valid anymore and where the shards it uses are not valid to
114            // use anymore.
115            //
116            // This is a rare race condition and something that is expected to
117            // happen every now and then. It's not a bug in the current way of
118            // how things work.
119            tracing::info!(
120                source_id = %id,
121                %worker_id,
122                "since of remap shard is the empty antichain, suspending...");
123
124            // We wait 5 hours to give the commands a chance to arrive at this
125            // replica and for it to drop our dataflow.
126            tokio::time::sleep(Duration::from_secs(5 * 60 * 60)).await;
127
128            // If we're still here after 5 hours, something has gone wrong and
129            // we complain.
130            soft_panic_or_log!(
131                "since of remap shard is the empty antichain, source_id = {id}, worker_id = {worker_id}"
132            );
133        }
134
135        if !PartialOrder::less_equal(since, &as_of) {
136            anyhow::bail!(
137                "invalid as_of: as_of({as_of:?}) < since({since:?}), \
138                source {id}, \
139                remap_shard: {:?}",
140                remap_metadata.data_shard
141            );
142        }
143
144        assert!(
145            as_of.elements() == [IntoTime::minimum()] || PartialOrder::less_than(&as_of, upper),
146            "invalid as_of: upper({upper:?}) <= as_of({as_of:?})",
147        );
148
149        tracing::info!(
150            ?since,
151            ?as_of,
152            ?upper,
153            "{operator}({id}) {worker_id}/{worker_count} initializing PersistHandle"
154        );
155
156        use futures::stream;
157        let events = stream::once(async move {
158            let updates = read_handle
159                .snapshot_and_fetch(as_of.clone())
160                .await
161                .expect("since <= as_of asserted");
162            let snapshot = stream::once(std::future::ready(ListenEvent::Updates(updates)));
163
164            let listener = read_handle
165                .listen(as_of.clone())
166                .await
167                .expect("since <= as_of asserted");
168
169            let listen_stream = stream::unfold(listener, |mut listener| async move {
170                let events = stream::iter(listener.fetch_next().await);
171                Some((events, listener))
172            })
173            .flatten();
174
175            snapshot.chain(listen_stream)
176        })
177        .flatten()
178        .boxed_local();
179
180        Ok(Self {
181            events,
182            write_handle,
183            read_only_rx,
184            pending_batch: vec![],
185            shared_write_frontier,
186        })
187    }
188}
189
190#[async_trait::async_trait(?Send)]
191impl<FromTime, IntoTime> RemapHandleReader for PersistHandle<FromTime, IntoTime>
192where
193    FromTime: SourceTimestamp,
194    IntoTime: Timestamp + Lattice + Codec64,
195{
196    type FromTime = FromTime;
197    type IntoTime = IntoTime;
198
199    async fn next(
200        &mut self,
201    ) -> Option<(
202        Vec<(Self::FromTime, Self::IntoTime, Diff)>,
203        Antichain<Self::IntoTime>,
204    )> {
205        while let Some(event) = self.events.next().await {
206            match event {
207                ListenEvent::Progress(new_upper) => {
208                    // Peel off a batch of pending data
209                    let batch = self
210                        .pending_batch
211                        .extract_if(.., |(_, ts, _)| !new_upper.less_equal(ts))
212                        .collect();
213                    return Some((batch, new_upper));
214                }
215                ListenEvent::Updates(msgs) => {
216                    for ((update, _), into_ts, diff) in msgs {
217                        let from_ts = FromTime::decode_row(&update.0.expect("invalid row"));
218                        self.pending_batch.push((from_ts, into_ts, diff.into()));
219                    }
220                }
221            }
222        }
223        None
224    }
225}
226
227#[async_trait::async_trait(?Send)]
228impl<FromTime, IntoTime> RemapHandle for PersistHandle<FromTime, IntoTime>
229where
230    FromTime: SourceTimestamp,
231    IntoTime: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
232{
233    async fn compare_and_append(
234        &mut self,
235        updates: Vec<(Self::FromTime, Self::IntoTime, Diff)>,
236        upper: Antichain<Self::IntoTime>,
237        new_upper: Antichain<Self::IntoTime>,
238    ) -> Result<(), UpperMismatch<Self::IntoTime>> {
239        if *self.read_only_rx.borrow() {
240            // We have to wait for either us coming out of read-only mode or
241            // someone else advancing the upper. If we just returned an
242            // `UpperMismatch` while in read-only mode, we would go into a busy
243            // loop because we'd be called over and over again. One presumes.
244
245            loop {
246                tracing::trace!(
247                    ?upper,
248                    ?new_upper,
249                    persist_upper = ?self.write_handle.upper(),
250                    "persist remap handle is in read-only mode, waiting until we come out of it or the shard upper advances");
251
252                // We don't try to be too smart here, and for example use
253                // `wait_for_upper_past()`. We'd have to use a select!, which
254                // would require cancel safety of `wait_for_upper_past()`, which
255                // it doesn't advertise.
256                let _ =
257                    tokio::time::timeout(Duration::from_secs(1), self.read_only_rx.changed()).await;
258
259                if !*self.read_only_rx.borrow() {
260                    tracing::trace!(
261                        ?upper,
262                        ?new_upper,
263                        persist_upper = ?self.write_handle.upper(),
264                        "persist remap handle has come out of read-only mode"
265                    );
266
267                    // It's okay to write now.
268                    break;
269                }
270
271                let current_upper = self.write_handle.fetch_recent_upper().await;
272
273                if PartialOrder::less_than(&upper, current_upper) {
274                    tracing::trace!(
275                        ?upper,
276                        ?new_upper,
277                        persist_upper = ?current_upper,
278                        "someone else advanced the upper, aborting write"
279                    );
280
281                    return Err(UpperMismatch {
282                        current: current_upper.clone(),
283                        expected: upper,
284                    });
285                }
286            }
287        }
288
289        let row_updates = updates.into_iter().map(|(from_ts, into_ts, diff)| {
290            (
291                (SourceData(Ok(from_ts.encode_row())), ()),
292                into_ts,
293                diff.into_inner(),
294            )
295        });
296
297        match self
298            .write_handle
299            .compare_and_append(row_updates, upper, new_upper.clone())
300            .await
301        {
302            Ok(result) => {
303                *self.shared_write_frontier.borrow_mut() = new_upper;
304                return result;
305            }
306            Err(invalid_use) => panic!("compare_and_append failed: {invalid_use}"),
307        }
308    }
309
310    fn upper(&self) -> &Antichain<Self::IntoTime> {
311        self.write_handle.upper()
312    }
313}