mz_storage/source/reclock/
compat.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Reclocking compatibility code until the whole ingestion pipeline is transformed to native
11//! timestamps
12
13use std::cell::RefCell;
14use std::rc::Rc;
15use std::sync::Arc;
16use std::time::Duration;
17
18use anyhow::Context;
19use differential_dataflow::lattice::Lattice;
20use fail::fail_point;
21use futures::StreamExt;
22use futures::stream::LocalBoxStream;
23use mz_ore::soft_panic_or_log;
24use mz_ore::vec::VecExt;
25use mz_persist_client::Diagnostics;
26use mz_persist_client::cache::PersistClientCache;
27use mz_persist_client::error::UpperMismatch;
28use mz_persist_client::read::ListenEvent;
29use mz_persist_client::write::WriteHandle;
30use mz_persist_types::Codec64;
31use mz_persist_types::codec_impls::UnitSchema;
32use mz_repr::{Diff, GlobalId, RelationDesc};
33use mz_storage_client::util::remap_handle::{RemapHandle, RemapHandleReader};
34use mz_storage_types::StorageDiff;
35use mz_storage_types::controller::CollectionMetadata;
36use mz_storage_types::sources::{SourceData, SourceTimestamp};
37use timely::order::PartialOrder;
38use timely::progress::Timestamp;
39use timely::progress::frontier::Antichain;
40use tokio::sync::watch;
41
42/// A handle to a persist shard that stores remap bindings
43pub struct PersistHandle<FromTime: SourceTimestamp, IntoTime: Timestamp + Lattice + Codec64> {
44    events: LocalBoxStream<
45        'static,
46        ListenEvent<
47            IntoTime,
48            (
49                (Result<SourceData, String>, Result<(), String>),
50                IntoTime,
51                StorageDiff,
52            ),
53        >,
54    >,
55    write_handle: WriteHandle<SourceData, (), IntoTime, StorageDiff>,
56    /// Whether or not this handle is in read-only mode.
57    read_only_rx: watch::Receiver<bool>,
58    pending_batch: Vec<(FromTime, IntoTime, Diff)>,
59    // Reports `self`'s write frontier.
60    shared_write_frontier: Rc<RefCell<Antichain<IntoTime>>>,
61}
62
63impl<FromTime: Timestamp, IntoTime: Timestamp + Sync> PersistHandle<FromTime, IntoTime>
64where
65    FromTime: SourceTimestamp,
66    IntoTime: Timestamp + Lattice + Codec64,
67{
68    pub async fn new(
69        persist_clients: Arc<PersistClientCache>,
70        read_only_rx: watch::Receiver<bool>,
71        metadata: CollectionMetadata,
72        as_of: Antichain<IntoTime>,
73        shared_write_frontier: Rc<RefCell<Antichain<IntoTime>>>,
74        // additional information to improve logging
75        id: GlobalId,
76        operator: &str,
77        worker_id: usize,
78        worker_count: usize,
79        // Must match the `FromTime`. Ideally we would be able to discover this
80        // from `SourceTimestamp`, but each source would need a specific `SourceTimestamp`
81        // implementation, as they do not share remap `RelationDesc`'s (columns names
82        // are different).
83        //
84        // TODO(guswynn): use the type-system to prevent misuse here.
85        remap_relation_desc: RelationDesc,
86        remap_collection_id: GlobalId,
87    ) -> anyhow::Result<Self> {
88        let remap_shard = if let Some(remap_shard) = metadata.remap_shard {
89            remap_shard
90        } else {
91            panic!(
92                "cannot create remap PersistHandle for collection without remap shard: {id}, metadata: {:?}",
93                metadata
94            );
95        };
96
97        let persist_client = persist_clients
98            .open(metadata.persist_location.clone())
99            .await
100            .context("error creating persist client")?;
101
102        let (write_handle, mut read_handle) = persist_client
103            .open(
104                remap_shard,
105                Arc::new(remap_relation_desc),
106                Arc::new(UnitSchema),
107                Diagnostics {
108                    shard_name: remap_collection_id.to_string(),
109                    handle_purpose: format!("reclock for {}", id),
110                },
111                false,
112            )
113            .await
114            .expect("invalid usage");
115
116        let upper = write_handle.upper();
117        // We want a leased reader because elsewhere in the code the `as_of`
118        // time may also be determined by another `ReadHandle`, and the pair of
119        // them offer the invariant that we need (that the `as_of` if <= this
120        // `since`). Using a `SinceHandle` here does not offer the same
121        // invariant when paired with a `ReadHandle`.
122        let since = read_handle.since();
123
124        // Allow manually simulating the scenario where the since of the remap
125        // shard has advanced too far.
126        fail_point!("invalid_remap_as_of");
127
128        if since.is_empty() {
129            // This can happen when, say, a source is being dropped but we on
130            // the cluster are busy and notice that only later. In those cases
131            // it can happen that we still try to render an ingestion that is
132            // not valid anymore and where the shards it uses are not valid to
133            // use anymore.
134            //
135            // This is a rare race condition and something that is expected to
136            // happen every now and then. It's not a bug in the current way of
137            // how things work.
138            tracing::info!(
139                source_id = %id,
140                %worker_id,
141                "since of remap shard is the empty antichain, suspending...");
142
143            // We wait 5 hours to give the commands a chance to arrive at this
144            // replica and for it to drop our dataflow.
145            tokio::time::sleep(Duration::from_secs(5 * 60 * 60)).await;
146
147            // If we're still here after 5 hours, something has gone wrong and
148            // we complain.
149            soft_panic_or_log!(
150                "since of remap shard is the empty antichain, source_id = {id}, worker_id = {worker_id}"
151            );
152        }
153
154        if !PartialOrder::less_equal(since, &as_of) {
155            anyhow::bail!(
156                "invalid as_of: as_of({as_of:?}) < since({since:?}), \
157                source {id}, \
158                remap_shard: {:?}",
159                metadata.remap_shard
160            );
161        }
162
163        assert!(
164            as_of.elements() == [IntoTime::minimum()] || PartialOrder::less_than(&as_of, upper),
165            "invalid as_of: upper({upper:?}) <= as_of({as_of:?})",
166        );
167
168        tracing::info!(
169            ?since,
170            ?as_of,
171            ?upper,
172            "{operator}({id}) {worker_id}/{worker_count} initializing PersistHandle"
173        );
174
175        use futures::stream;
176        let events = stream::once(async move {
177            let updates = read_handle
178                .snapshot_and_fetch(as_of.clone())
179                .await
180                .expect("since <= as_of asserted");
181            let snapshot = stream::once(std::future::ready(ListenEvent::Updates(updates)));
182
183            let listener = read_handle
184                .listen(as_of.clone())
185                .await
186                .expect("since <= as_of asserted");
187
188            let listen_stream = stream::unfold(listener, |mut listener| async move {
189                let events = stream::iter(listener.fetch_next().await);
190                Some((events, listener))
191            })
192            .flatten();
193
194            snapshot.chain(listen_stream)
195        })
196        .flatten()
197        .boxed_local();
198
199        Ok(Self {
200            events,
201            write_handle,
202            read_only_rx,
203            pending_batch: vec![],
204            shared_write_frontier,
205        })
206    }
207}
208
209#[async_trait::async_trait(?Send)]
210impl<FromTime, IntoTime> RemapHandleReader for PersistHandle<FromTime, IntoTime>
211where
212    FromTime: SourceTimestamp,
213    IntoTime: Timestamp + Lattice + Codec64,
214{
215    type FromTime = FromTime;
216    type IntoTime = IntoTime;
217
218    async fn next(
219        &mut self,
220    ) -> Option<(
221        Vec<(Self::FromTime, Self::IntoTime, Diff)>,
222        Antichain<Self::IntoTime>,
223    )> {
224        while let Some(event) = self.events.next().await {
225            match event {
226                ListenEvent::Progress(new_upper) => {
227                    // Peel off a batch of pending data
228                    let batch = self
229                        .pending_batch
230                        .drain_filter_swapping(|(_, ts, _)| !new_upper.less_equal(ts))
231                        .collect();
232                    return Some((batch, new_upper));
233                }
234                ListenEvent::Updates(msgs) => {
235                    for ((update, _), into_ts, diff) in msgs {
236                        let from_ts = FromTime::decode_row(
237                            &update.expect("invalid row").0.expect("invalid row"),
238                        );
239                        self.pending_batch.push((from_ts, into_ts, diff.into()));
240                    }
241                }
242            }
243        }
244        None
245    }
246}
247
248#[async_trait::async_trait(?Send)]
249impl<FromTime, IntoTime> RemapHandle for PersistHandle<FromTime, IntoTime>
250where
251    FromTime: SourceTimestamp,
252    IntoTime: Timestamp + Lattice + Codec64 + Sync,
253{
254    async fn compare_and_append(
255        &mut self,
256        updates: Vec<(Self::FromTime, Self::IntoTime, Diff)>,
257        upper: Antichain<Self::IntoTime>,
258        new_upper: Antichain<Self::IntoTime>,
259    ) -> Result<(), UpperMismatch<Self::IntoTime>> {
260        if *self.read_only_rx.borrow() {
261            // We have to wait for either us coming out of read-only mode or
262            // someone else advancing the upper. If we just returned an
263            // `UpperMismatch` while in read-only mode, we would go into a busy
264            // loop because we'd be called over and over again. One presumes.
265
266            loop {
267                tracing::trace!(
268                    ?upper,
269                    ?new_upper,
270                    persist_upper = ?self.write_handle.upper(),
271                    "persist remap handle is in read-only mode, waiting until we come out of it or the shard upper advances");
272
273                // We don't try to be too smart here, and for example use
274                // `wait_for_upper_past()`. We'd have to use a select!, which
275                // would require cancel safety of `wait_for_upper_past()`, which
276                // it doesn't advertise.
277                let _ =
278                    tokio::time::timeout(Duration::from_secs(1), self.read_only_rx.changed()).await;
279
280                if !*self.read_only_rx.borrow() {
281                    tracing::trace!(
282                        ?upper,
283                        ?new_upper,
284                        persist_upper = ?self.write_handle.upper(),
285                        "persist remap handle has come out of read-only mode"
286                    );
287
288                    // It's okay to write now.
289                    break;
290                }
291
292                let current_upper = self.write_handle.fetch_recent_upper().await;
293
294                if PartialOrder::less_than(&upper, current_upper) {
295                    tracing::trace!(
296                        ?upper,
297                        ?new_upper,
298                        persist_upper = ?current_upper,
299                        "someone else advanced the upper, aborting write"
300                    );
301
302                    return Err(UpperMismatch {
303                        current: current_upper.clone(),
304                        expected: upper,
305                    });
306                }
307            }
308        }
309
310        let row_updates = updates.into_iter().map(|(from_ts, into_ts, diff)| {
311            (
312                (SourceData(Ok(from_ts.encode_row())), ()),
313                into_ts,
314                diff.into_inner(),
315            )
316        });
317
318        match self
319            .write_handle
320            .compare_and_append(row_updates, upper, new_upper.clone())
321            .await
322        {
323            Ok(result) => {
324                *self.shared_write_frontier.borrow_mut() = new_upper;
325                return result;
326            }
327            Err(invalid_use) => panic!("compare_and_append failed: {invalid_use}"),
328        }
329    }
330
331    fn upper(&self) -> &Antichain<Self::IntoTime> {
332        self.write_handle.upper()
333    }
334}