mz_storage/source/
reclock.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10/// The `ReclockOperator` observes the progress of a stream that is
11/// timestamped with some source time `FromTime` and generates bindings that describe how the
12/// collection should evolve in target time `IntoTime`.
13use differential_dataflow::consolidation;
14use differential_dataflow::lattice::Lattice;
15use mz_persist_client::error::UpperMismatch;
16use mz_repr::Diff;
17use mz_storage_client::util::remap_handle::RemapHandle;
18use timely::order::PartialOrder;
19use timely::progress::Timestamp;
20use timely::progress::frontier::{Antichain, AntichainRef, MutableAntichain};
21
22pub mod compat;
23
24/// The `ReclockOperator` is responsible for observing progress in the `FromTime` domain and
25/// consume messages from a ticker of progress in the `IntoTime` domain. When the source frontier
26/// advances and the ticker ticks the `ReclockOperator` will generate the data that describe this
27/// correspondence and write them out to its provided remap handle. The output generated by the
28/// reclock operator can be thought of as `Collection<G, FromTime>` where `G::Timestamp` is
29/// `IntoTime`.
30///
31/// The `ReclockOperator` will always maintain the invariant that for any time `IntoTime` the remap
32/// collection accumulates into an Antichain where each `FromTime` timestamp has frequency `1`. In
33/// other words the remap collection describes a well formed `Antichain<FromTime>` as it is
34/// marching forwards.
35#[derive(Debug)]
36pub struct ReclockOperator<
37    FromTime: Timestamp,
38    IntoTime: Timestamp + Lattice,
39    Handle: RemapHandle<FromTime = FromTime, IntoTime = IntoTime>,
40> {
41    /// Upper frontier of the partial remap trace
42    upper: Antichain<IntoTime>,
43    /// The upper frontier in terms of `FromTime`. Any attempt to reclock messages beyond this
44    /// frontier will lead to minting new bindings.
45    source_upper: MutableAntichain<FromTime>,
46
47    /// A handle allowing this operator to publish updates to and read back from the remap collection
48    remap_handle: Handle,
49}
50
51#[derive(Clone, Debug, PartialEq)]
52pub struct ReclockBatch<FromTime, IntoTime> {
53    pub updates: Vec<(FromTime, IntoTime, Diff)>,
54    pub upper: Antichain<IntoTime>,
55}
56
57impl<FromTime, IntoTime, Handle> ReclockOperator<FromTime, IntoTime, Handle>
58where
59    FromTime: Timestamp,
60    IntoTime: Timestamp + Lattice,
61    Handle: RemapHandle<FromTime = FromTime, IntoTime = IntoTime>,
62{
63    /// Construct a new [ReclockOperator] from the given collection metadata
64    pub async fn new(remap_handle: Handle) -> (Self, ReclockBatch<FromTime, IntoTime>) {
65        let upper = remap_handle.upper().clone();
66
67        let mut operator = Self {
68            upper: Antichain::from_elem(IntoTime::minimum()),
69            source_upper: MutableAntichain::new(),
70            remap_handle,
71        };
72
73        // Load the initial state that might exist in the shard
74        let trace_batch = if upper.elements() != [IntoTime::minimum()] {
75            operator.sync(upper.borrow()).await
76        } else {
77            ReclockBatch {
78                updates: vec![],
79                upper: Antichain::from_elem(IntoTime::minimum()),
80            }
81        };
82
83        (operator, trace_batch)
84    }
85
86    /// Syncs the state of this operator to match that of the persist shard until the provided
87    /// frontier
88    async fn sync(
89        &mut self,
90        target_upper: AntichainRef<'_, IntoTime>,
91    ) -> ReclockBatch<FromTime, IntoTime> {
92        let mut updates: Vec<(FromTime, IntoTime, Diff)> = Vec::new();
93
94        // Tail the remap collection until we reach the target upper frontier. Note that, in the
95        // common case, we are also the writer, so we are waiting to read-back what we wrote
96        while PartialOrder::less_than(&self.upper.borrow(), &target_upper) {
97            let (mut batch, upper) = self
98                .remap_handle
99                .next()
100                .await
101                .expect("requested data after empty antichain");
102            self.upper = upper;
103            updates.append(&mut batch);
104        }
105
106        self.source_upper.update_iter(
107            updates
108                .iter()
109                .map(|(src_ts, _dest_ts, diff)| (src_ts.clone(), diff.into_inner())),
110        );
111
112        ReclockBatch {
113            updates,
114            upper: self.upper.clone(),
115        }
116    }
117
118    pub async fn mint(
119        &mut self,
120        binding_ts: IntoTime,
121        mut new_into_upper: Antichain<IntoTime>,
122        new_from_upper: AntichainRef<'_, FromTime>,
123    ) -> ReclockBatch<FromTime, IntoTime> {
124        assert!(!new_into_upper.less_equal(&binding_ts));
125        // The updates to the remap trace that occured during minting.
126        let mut batch = ReclockBatch {
127            updates: vec![],
128            upper: self.upper.clone(),
129        };
130
131        while *self.upper == [IntoTime::minimum()]
132            || (PartialOrder::less_equal(&self.source_upper.frontier(), &new_from_upper)
133                && PartialOrder::less_than(&self.upper, &new_into_upper)
134                && self.upper.less_equal(&binding_ts))
135        {
136            // If source is closed, close remap shard as well.
137            if new_from_upper.is_empty() {
138                new_into_upper = Antichain::new();
139            }
140
141            // If this is the first binding we mint then we will mint it at the minimum target
142            // timestamp. The first source upper is always the upper of the snapshot and by mapping
143            // it to the minimum target timestamp we make it so that the final shard never appears
144            // empty at any timestamp.
145            let binding_ts = if *self.upper == [IntoTime::minimum()] {
146                IntoTime::minimum()
147            } else {
148                binding_ts.clone()
149            };
150
151            let mut updates = vec![];
152            for src_ts in self.source_upper.frontier().iter().cloned() {
153                updates.push((src_ts, binding_ts.clone(), Diff::MINUS_ONE));
154            }
155            for src_ts in new_from_upper.iter().cloned() {
156                updates.push((src_ts, binding_ts.clone(), Diff::ONE));
157            }
158            consolidation::consolidate_updates(&mut updates);
159
160            let new_batch = match self.append_batch(updates, &new_into_upper).await {
161                Ok(trace_batch) => trace_batch,
162                Err(UpperMismatch { current, .. }) => self.sync(current.borrow()).await,
163            };
164            batch.updates.extend(new_batch.updates);
165            batch.upper = new_batch.upper;
166        }
167
168        batch
169    }
170
171    /// Appends the provided updates to the remap collection at the next available minting
172    /// IntoTime and updates this operator's in-memory state accordingly.
173    ///
174    /// If an attempt to mint bindings fails due to another process having raced and appended
175    /// bindings concurrently then the current global upper will be returned as an error. This is
176    /// the frontier that this operator must be synced to for a future append attempt to have any
177    /// chance of success.
178    async fn append_batch(
179        &mut self,
180        updates: Vec<(FromTime, IntoTime, Diff)>,
181        new_upper: &Antichain<IntoTime>,
182    ) -> Result<ReclockBatch<FromTime, IntoTime>, UpperMismatch<IntoTime>> {
183        match self
184            .remap_handle
185            .compare_and_append(updates, self.upper.clone(), new_upper.clone())
186            .await
187        {
188            // We have successfully produced data in the remap collection so let's read back what
189            // we wrote to update our local state
190            Ok(()) => Ok(self.sync(new_upper.borrow()).await),
191            Err(mismatch) => Err(mismatch),
192        }
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use std::cell::RefCell;
199    use std::rc::Rc;
200    use std::str::FromStr;
201    use std::sync::Arc;
202    use std::sync::LazyLock;
203    use std::time::Duration;
204
205    use mz_build_info::DUMMY_BUILD_INFO;
206    use mz_ore::metrics::MetricsRegistry;
207    use mz_ore::now::SYSTEM_TIME;
208    use mz_ore::url::SensitiveUrl;
209    use mz_persist_client::cache::PersistClientCache;
210    use mz_persist_client::cfg::PersistConfig;
211    use mz_persist_client::rpc::PubSubClientConnection;
212    use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
213    use mz_persist_types::codec_impls::UnitSchema;
214    use mz_repr::{GlobalId, RelationDesc, ScalarType, Timestamp};
215    use mz_storage_client::util::remap_handle::RemapHandle;
216    use mz_storage_types::StorageDiff;
217    use mz_storage_types::controller::CollectionMetadata;
218    use mz_storage_types::sources::kafka::{self, RangeBound as RB};
219    use mz_storage_types::sources::{MzOffset, SourceData};
220    use mz_timely_util::order::Partitioned;
221    use timely::progress::Timestamp as _;
222    use tokio::sync::watch;
223
224    use super::*;
225
226    // 15 minutes
227    static PERSIST_READER_LEASE_TIMEOUT_MS: Duration = Duration::from_secs(60 * 15);
228
229    static PERSIST_CACHE: LazyLock<Arc<PersistClientCache>> = LazyLock::new(|| {
230        let persistcfg = PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone());
231        persistcfg.set_reader_lease_duration(PERSIST_READER_LEASE_TIMEOUT_MS);
232        Arc::new(PersistClientCache::new(
233            persistcfg,
234            &MetricsRegistry::new(),
235            |_, _| PubSubClientConnection::noop(),
236        ))
237    });
238
239    static PROGRESS_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
240        RelationDesc::builder()
241            .with_column(
242                "partition",
243                ScalarType::Range {
244                    element_type: Box::new(ScalarType::Numeric { max_scale: None }),
245                }
246                .nullable(false),
247            )
248            .with_column("offset", ScalarType::UInt64.nullable(true))
249            .finish()
250    });
251
252    async fn make_test_operator(
253        shard: ShardId,
254        as_of: Antichain<Timestamp>,
255    ) -> (
256        ReclockOperator<
257            kafka::KafkaTimestamp,
258            Timestamp,
259            impl RemapHandle<FromTime = kafka::KafkaTimestamp, IntoTime = Timestamp>,
260        >,
261        ReclockBatch<kafka::KafkaTimestamp, Timestamp>,
262    ) {
263        let metadata = CollectionMetadata {
264            persist_location: PersistLocation {
265                blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
266                consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
267            },
268            remap_shard: Some(shard),
269            data_shard: ShardId::new(),
270            relation_desc: RelationDesc::empty(),
271            txns_shard: None,
272        };
273
274        let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
275
276        // Always in read-write mode for tests.
277        let (_read_only_tx, read_only_rx) = watch::channel(false);
278        let remap_handle = crate::source::reclock::compat::PersistHandle::new(
279            Arc::clone(&*PERSIST_CACHE),
280            read_only_rx,
281            metadata,
282            as_of.clone(),
283            write_frontier,
284            GlobalId::Explain,
285            "unittest",
286            0,
287            1,
288            PROGRESS_DESC.clone(),
289            GlobalId::Explain,
290        )
291        .await
292        .unwrap();
293
294        let (mut operator, mut initial_batch) = ReclockOperator::new(remap_handle).await;
295
296        // Push any updates that might already exist in the persist shard to the follower.
297        if *initial_batch.upper == [Timestamp::minimum()] {
298            // In the tests we always reclock the minimum source frontier to the minimum target
299            // frontier, which we do in this step.
300            initial_batch = operator
301                .mint(
302                    0.into(),
303                    Antichain::from_elem(1.into()),
304                    Antichain::from_elem(Partitioned::minimum()).borrow(),
305                )
306                .await;
307        }
308
309        (operator, initial_batch)
310    }
311
312    /// Generates a [`kafka::NativeFrontier`] antichain where all the provided
313    /// partitions are at the specified offset and the gaps in between are filled with range
314    /// timestamps at offset zero.
315    fn partitioned_frontier<I>(items: I) -> Antichain<kafka::KafkaTimestamp>
316    where
317        I: IntoIterator<Item = (i32, MzOffset)>,
318    {
319        let mut frontier = Antichain::new();
320        let mut prev = RB::NegInfinity;
321        for (pid, offset) in items {
322            assert!(prev < RB::before(pid));
323            let gap = Partitioned::new_range(prev, RB::before(pid), MzOffset::from(0));
324            frontier.extend([gap, Partitioned::new_singleton(RB::exact(pid), offset)]);
325            prev = RB::after(pid);
326        }
327        frontier.insert(Partitioned::new_range(
328            prev,
329            RB::PosInfinity,
330            MzOffset::from(0),
331        ));
332        frontier
333    }
334
335    #[mz_ore::test(tokio::test)]
336    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
337    async fn test_basic_usage() {
338        let (mut operator, _) =
339            make_test_operator(ShardId::new(), Antichain::from_elem(0.into())).await;
340
341        // Reclock offsets 1 and 3 to timestamp 1000
342        let source_upper = partitioned_frontier([(0, MzOffset::from(4))]);
343        let mut batch = operator
344            .mint(
345                1000.into(),
346                Antichain::from_elem(1001.into()),
347                source_upper.borrow(),
348            )
349            .await;
350        let mut expected_batch: ReclockBatch<_, Timestamp> = ReclockBatch {
351            updates: vec![
352                (
353                    Partitioned::new_range(RB::NegInfinity, RB::before(0), MzOffset::from(0)),
354                    1000.into(),
355                    Diff::ONE,
356                ),
357                (
358                    Partitioned::new_range(RB::after(0), RB::PosInfinity, MzOffset::from(0)),
359                    1000.into(),
360                    Diff::ONE,
361                ),
362                (
363                    Partitioned::new_range(RB::NegInfinity, RB::PosInfinity, MzOffset::from(0)),
364                    1000.into(),
365                    Diff::MINUS_ONE,
366                ),
367                (
368                    Partitioned::new_singleton(RB::exact(0), MzOffset::from(4)),
369                    1000.into(),
370                    Diff::ONE,
371                ),
372            ],
373            upper: Antichain::from_elem(Timestamp::from(1001)),
374        };
375        batch.updates.sort();
376        expected_batch.updates.sort();
377        assert_eq!(batch, expected_batch);
378    }
379
380    #[mz_ore::test(tokio::test)]
381    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
382    async fn test_compaction() {
383        let persist_location = PersistLocation {
384            blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
385            consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
386        };
387
388        let remap_shard = ShardId::new();
389
390        let persist_client = PERSIST_CACHE
391            .open(persist_location)
392            .await
393            .expect("error creating persist client");
394
395        let mut remap_read_handle = persist_client
396            .open_critical_since::<SourceData, (), Timestamp, StorageDiff, u64>(
397                remap_shard,
398                PersistClient::CONTROLLER_CRITICAL_SINCE,
399                Diagnostics::from_purpose("test_since_hold"),
400            )
401            .await
402            .expect("error opening persist shard");
403
404        let (mut operator, _batch) =
405            make_test_operator(remap_shard, Antichain::from_elem(0.into())).await;
406
407        // Ming a few bindings
408        let source_upper = partitioned_frontier([(0, MzOffset::from(3))]);
409        operator
410            .mint(
411                1000.into(),
412                Antichain::from_elem(1001.into()),
413                source_upper.borrow(),
414            )
415            .await;
416
417        let source_upper = partitioned_frontier([(0, MzOffset::from(5))]);
418        operator
419            .mint(
420                2000.into(),
421                Antichain::from_elem(2001.into()),
422                source_upper.borrow(),
423            )
424            .await;
425
426        // Compact enough so that offsets >= 3 remain uncompacted
427        remap_read_handle
428            .compare_and_downgrade_since(&0, (&0, &Antichain::from_elem(1000.into())))
429            .await
430            .unwrap();
431
432        // Starting a new operator with an `as_of` is the same as having compacted
433        let (_operator, mut initial_batch) =
434            make_test_operator(remap_shard, Antichain::from_elem(1000.into())).await;
435
436        let mut expected_batch: ReclockBatch<_, Timestamp> = ReclockBatch {
437            updates: vec![
438                (
439                    Partitioned::new_range(RB::NegInfinity, RB::before(0), MzOffset::from(0)),
440                    1000.into(),
441                    Diff::ONE,
442                ),
443                (
444                    Partitioned::new_range(RB::after(0), RB::PosInfinity, MzOffset::from(0)),
445                    1000.into(),
446                    Diff::ONE,
447                ),
448                (
449                    Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
450                    1000.into(),
451                    Diff::ONE,
452                ),
453                (
454                    Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
455                    2000.into(),
456                    Diff::MINUS_ONE,
457                ),
458                (
459                    Partitioned::new_singleton(RB::exact(0), MzOffset::from(5)),
460                    2000.into(),
461                    Diff::ONE,
462                ),
463            ],
464            upper: Antichain::from_elem(Timestamp::from(2001)),
465        };
466        expected_batch.updates.sort();
467        initial_batch.updates.sort();
468        assert_eq!(initial_batch, expected_batch);
469    }
470
471    #[mz_ore::test(tokio::test)]
472    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
473    async fn test_concurrency() {
474        // Create two operators pointing to the same shard
475        let shared_shard = ShardId::new();
476        let (mut op_a, _) = make_test_operator(shared_shard, Antichain::from_elem(0.into())).await;
477        let (mut op_b, _) = make_test_operator(shared_shard, Antichain::from_elem(0.into())).await;
478
479        // Mint some bindings through operator A
480        let source_upper = partitioned_frontier([(0, MzOffset::from(3))]);
481        let mut batch = op_a
482            .mint(
483                1000.into(),
484                Antichain::from_elem(1001.into()),
485                source_upper.borrow(),
486            )
487            .await;
488        let mut expected_batch: ReclockBatch<_, Timestamp> = ReclockBatch {
489            updates: vec![
490                (
491                    Partitioned::new_range(RB::NegInfinity, RB::before(0), MzOffset::from(0)),
492                    1000.into(),
493                    Diff::ONE,
494                ),
495                (
496                    Partitioned::new_range(RB::after(0), RB::PosInfinity, MzOffset::from(0)),
497                    1000.into(),
498                    Diff::ONE,
499                ),
500                (
501                    Partitioned::new_range(RB::NegInfinity, RB::PosInfinity, MzOffset::from(0)),
502                    1000.into(),
503                    Diff::MINUS_ONE,
504                ),
505                (
506                    Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
507                    1000.into(),
508                    Diff::ONE,
509                ),
510            ],
511            upper: Antichain::from_elem(Timestamp::from(1001)),
512        };
513        batch.updates.sort();
514        expected_batch.updates.sort();
515        assert_eq!(batch, expected_batch);
516
517        // Operator B should attempt to mint in one go, fail, re-sync, and retry only for the
518        // bindings that still need minting
519        let source_upper = partitioned_frontier([(0, MzOffset::from(5))]);
520        let mut batch = op_b
521            .mint(
522                11000.into(),
523                Antichain::from_elem(11001.into()),
524                source_upper.borrow(),
525            )
526            .await;
527        expected_batch.updates.extend([
528            (
529                Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
530                11000.into(),
531                Diff::MINUS_ONE,
532            ),
533            (
534                Partitioned::new_singleton(RB::exact(0), MzOffset::from(5)),
535                11000.into(),
536                Diff::ONE,
537            ),
538        ]);
539        expected_batch.upper = Antichain::from_elem(Timestamp::from(11001));
540        batch.updates.sort();
541        expected_batch.updates.sort();
542        assert_eq!(batch, expected_batch);
543    }
544
545    // Regression test for
546    // https://github.com/MaterializeInc/database-issues/issues/4216.
547    #[mz_ore::test(tokio::test(start_paused = true))]
548    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
549    async fn test_since_hold() {
550        let binding_shard = ShardId::new();
551
552        let (mut operator, _) =
553            make_test_operator(binding_shard, Antichain::from_elem(0.into())).await;
554
555        // We do multiple rounds of minting. This will downgrade the since of
556        // the internal listen. If we didn't make sure to also heartbeat the
557        // internal handle that holds back the overall remap since the checks
558        // below would fail.
559        //
560        // We do two rounds and advance the time by half the lease timeout in
561        // between so that the "listen handle" will not timeout but the internal
562        // handle used for holding back the since will timeout.
563
564        tokio::time::advance(PERSIST_READER_LEASE_TIMEOUT_MS / 2 + Duration::from_millis(1)).await;
565        let source_upper = partitioned_frontier([(0, MzOffset::from(3))]);
566        let _ = operator
567            .mint(
568                1000.into(),
569                Antichain::from_elem(1001.into()),
570                source_upper.borrow(),
571            )
572            .await;
573
574        tokio::time::advance(PERSIST_READER_LEASE_TIMEOUT_MS / 2 + Duration::from_millis(1)).await;
575        let source_upper = partitioned_frontier([(0, MzOffset::from(5))]);
576        let _ = operator
577            .mint(
578                2000.into(),
579                Antichain::from_elem(2001.into()),
580                source_upper.borrow(),
581            )
582            .await;
583
584        // Allow time for background maintenance work, which does lease
585        // expiration. 1 ms is enough here, we just need to yield to allow the
586        // background task to be "scheduled".
587        tokio::time::sleep(Duration::from_millis(1)).await;
588
589        // Starting a new operator with an `as_of` of `0`, to verify that
590        // holding back the `since` of the remap shard works as expected.
591        let (_operator, _batch) =
592            make_test_operator(binding_shard, Antichain::from_elem(0.into())).await;
593
594        // Also manually assert the since of the remap shard.
595        let persist_location = PersistLocation {
596            blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
597            consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
598        };
599
600        let persist_client = PERSIST_CACHE
601            .open(persist_location)
602            .await
603            .expect("error creating persist client");
604
605        let read_handle = persist_client
606            .open_leased_reader::<SourceData, (), Timestamp, StorageDiff>(
607                binding_shard,
608                Arc::new(PROGRESS_DESC.clone()),
609                Arc::new(UnitSchema),
610                Diagnostics::from_purpose("test_since_hold"),
611                true,
612            )
613            .await
614            .expect("error opening persist shard");
615
616        assert_eq!(
617            Antichain::from_elem(0.into()),
618            read_handle.since().to_owned()
619        );
620    }
621}