mz_storage/source/
reclock.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10/// The `ReclockOperator` observes the progress of a stream that is
11/// timestamped with some source time `FromTime` and generates bindings that describe how the
12/// collection should evolve in target time `IntoTime`.
13use differential_dataflow::consolidation;
14use differential_dataflow::lattice::Lattice;
15use mz_persist_client::error::UpperMismatch;
16use mz_repr::Diff;
17use mz_storage_client::util::remap_handle::RemapHandle;
18use timely::order::PartialOrder;
19use timely::progress::Timestamp;
20use timely::progress::frontier::{Antichain, AntichainRef, MutableAntichain};
21
22pub mod compat;
23
24/// The `ReclockOperator` is responsible for observing progress in the `FromTime` domain and
25/// consume messages from a ticker of progress in the `IntoTime` domain. When the source frontier
26/// advances and the ticker ticks the `ReclockOperator` will generate the data that describe this
27/// correspondence and write them out to its provided remap handle. The output generated by the
28/// reclock operator can be thought of as `Collection<G, FromTime>` where `G::Timestamp` is
29/// `IntoTime`.
30///
31/// The `ReclockOperator` will always maintain the invariant that for any time `IntoTime` the remap
32/// collection accumulates into an Antichain where each `FromTime` timestamp has frequency `1`. In
33/// other words the remap collection describes a well formed `Antichain<FromTime>` as it is
34/// marching forwards.
35#[derive(Debug)]
36pub struct ReclockOperator<
37    FromTime: Timestamp,
38    IntoTime: Timestamp + Lattice,
39    Handle: RemapHandle<FromTime = FromTime, IntoTime = IntoTime>,
40> {
41    /// Upper frontier of the partial remap trace
42    upper: Antichain<IntoTime>,
43    /// The upper frontier in terms of `FromTime`. Any attempt to reclock messages beyond this
44    /// frontier will lead to minting new bindings.
45    source_upper: MutableAntichain<FromTime>,
46
47    /// A handle allowing this operator to publish updates to and read back from the remap collection
48    remap_handle: Handle,
49}
50
51#[derive(Clone, Debug, PartialEq)]
52pub struct ReclockBatch<FromTime, IntoTime> {
53    pub updates: Vec<(FromTime, IntoTime, Diff)>,
54    pub upper: Antichain<IntoTime>,
55}
56
57impl<FromTime, IntoTime, Handle> ReclockOperator<FromTime, IntoTime, Handle>
58where
59    FromTime: Timestamp,
60    IntoTime: Timestamp + Lattice,
61    Handle: RemapHandle<FromTime = FromTime, IntoTime = IntoTime>,
62{
63    /// Construct a new [ReclockOperator] from the given collection metadata
64    pub async fn new(remap_handle: Handle) -> (Self, ReclockBatch<FromTime, IntoTime>) {
65        let upper = remap_handle.upper().clone();
66
67        let mut operator = Self {
68            upper: Antichain::from_elem(IntoTime::minimum()),
69            source_upper: MutableAntichain::new(),
70            remap_handle,
71        };
72
73        // Load the initial state that might exist in the shard
74        let trace_batch = if upper.elements() != [IntoTime::minimum()] {
75            operator.sync(upper.borrow()).await
76        } else {
77            ReclockBatch {
78                updates: vec![],
79                upper: Antichain::from_elem(IntoTime::minimum()),
80            }
81        };
82
83        (operator, trace_batch)
84    }
85
86    /// Syncs the state of this operator to match that of the persist shard until the provided
87    /// frontier
88    async fn sync(
89        &mut self,
90        target_upper: AntichainRef<'_, IntoTime>,
91    ) -> ReclockBatch<FromTime, IntoTime> {
92        let mut updates: Vec<(FromTime, IntoTime, Diff)> = Vec::new();
93
94        // Tail the remap collection until we reach the target upper frontier. Note that, in the
95        // common case, we are also the writer, so we are waiting to read-back what we wrote
96        while PartialOrder::less_than(&self.upper.borrow(), &target_upper) {
97            let (mut batch, upper) = self
98                .remap_handle
99                .next()
100                .await
101                .expect("requested data after empty antichain");
102            self.upper = upper;
103            updates.append(&mut batch);
104        }
105
106        self.source_upper.update_iter(
107            updates
108                .iter()
109                .map(|(src_ts, _dest_ts, diff)| (src_ts.clone(), diff.into_inner())),
110        );
111
112        ReclockBatch {
113            updates,
114            upper: self.upper.clone(),
115        }
116    }
117
118    pub async fn mint(
119        &mut self,
120        binding_ts: IntoTime,
121        mut new_into_upper: Antichain<IntoTime>,
122        new_from_upper: AntichainRef<'_, FromTime>,
123    ) -> ReclockBatch<FromTime, IntoTime> {
124        assert!(!new_into_upper.less_equal(&binding_ts));
125        // The updates to the remap trace that occured during minting.
126        let mut batch = ReclockBatch {
127            updates: vec![],
128            upper: self.upper.clone(),
129        };
130
131        while *self.upper == [IntoTime::minimum()]
132            || (PartialOrder::less_equal(&self.source_upper.frontier(), &new_from_upper)
133                && PartialOrder::less_than(&self.upper, &new_into_upper)
134                && self.upper.less_equal(&binding_ts))
135        {
136            // If source is closed, close remap shard as well.
137            if new_from_upper.is_empty() {
138                new_into_upper = Antichain::new();
139            }
140
141            // If this is the first binding we mint then we will mint it at the minimum target
142            // timestamp. The first source upper is always the upper of the snapshot and by mapping
143            // it to the minimum target timestamp we make it so that the final shard never appears
144            // empty at any timestamp.
145            let binding_ts = if *self.upper == [IntoTime::minimum()] {
146                IntoTime::minimum()
147            } else {
148                binding_ts.clone()
149            };
150
151            let mut updates = vec![];
152            for src_ts in self.source_upper.frontier().iter().cloned() {
153                updates.push((src_ts, binding_ts.clone(), Diff::MINUS_ONE));
154            }
155            for src_ts in new_from_upper.iter().cloned() {
156                updates.push((src_ts, binding_ts.clone(), Diff::ONE));
157            }
158            consolidation::consolidate_updates(&mut updates);
159
160            let new_batch = match self.append_batch(updates, &new_into_upper).await {
161                Ok(trace_batch) => trace_batch,
162                Err(UpperMismatch { current, .. }) => self.sync(current.borrow()).await,
163            };
164            batch.updates.extend(new_batch.updates);
165            batch.upper = new_batch.upper;
166        }
167
168        batch
169    }
170
171    /// Appends the provided updates to the remap collection at the next available minting
172    /// IntoTime and updates this operator's in-memory state accordingly.
173    ///
174    /// If an attempt to mint bindings fails due to another process having raced and appended
175    /// bindings concurrently then the current global upper will be returned as an error. This is
176    /// the frontier that this operator must be synced to for a future append attempt to have any
177    /// chance of success.
178    async fn append_batch(
179        &mut self,
180        updates: Vec<(FromTime, IntoTime, Diff)>,
181        new_upper: &Antichain<IntoTime>,
182    ) -> Result<ReclockBatch<FromTime, IntoTime>, UpperMismatch<IntoTime>> {
183        match self
184            .remap_handle
185            .compare_and_append(updates, self.upper.clone(), new_upper.clone())
186            .await
187        {
188            // We have successfully produced data in the remap collection so let's read back what
189            // we wrote to update our local state
190            Ok(()) => Ok(self.sync(new_upper.borrow()).await),
191            Err(mismatch) => Err(mismatch),
192        }
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use std::cell::RefCell;
199    use std::rc::Rc;
200    use std::str::FromStr;
201    use std::sync::Arc;
202    use std::sync::LazyLock;
203    use std::time::Duration;
204
205    use mz_build_info::DUMMY_BUILD_INFO;
206    use mz_ore::metrics::MetricsRegistry;
207    use mz_ore::now::SYSTEM_TIME;
208    use mz_ore::url::SensitiveUrl;
209    use mz_persist_client::cache::PersistClientCache;
210    use mz_persist_client::cfg::PersistConfig;
211    use mz_persist_client::rpc::PubSubClientConnection;
212    use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
213    use mz_persist_types::codec_impls::UnitSchema;
214    use mz_repr::{GlobalId, RelationDesc, SqlScalarType, Timestamp};
215    use mz_storage_client::util::remap_handle::RemapHandle;
216    use mz_storage_types::StorageDiff;
217    use mz_storage_types::controller::CollectionMetadata;
218    use mz_storage_types::sources::kafka::{self, RangeBound as RB};
219    use mz_storage_types::sources::{MzOffset, SourceData};
220    use mz_timely_util::order::Partitioned;
221    use timely::progress::Timestamp as _;
222    use tokio::sync::watch;
223
224    use super::*;
225
226    // 15 minutes
227    static PERSIST_READER_LEASE_TIMEOUT_MS: Duration = Duration::from_secs(60 * 15);
228
229    static PERSIST_CACHE: LazyLock<Arc<PersistClientCache>> = LazyLock::new(|| {
230        let persistcfg = PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone());
231        persistcfg.set_reader_lease_duration(PERSIST_READER_LEASE_TIMEOUT_MS);
232        Arc::new(PersistClientCache::new(
233            persistcfg,
234            &MetricsRegistry::new(),
235            |_, _| PubSubClientConnection::noop(),
236        ))
237    });
238
239    static PROGRESS_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
240        RelationDesc::builder()
241            .with_column(
242                "partition",
243                SqlScalarType::Range {
244                    element_type: Box::new(SqlScalarType::Numeric { max_scale: None }),
245                }
246                .nullable(false),
247            )
248            .with_column("offset", SqlScalarType::UInt64.nullable(true))
249            .finish()
250    });
251
252    async fn make_test_operator(
253        shard: ShardId,
254        as_of: Antichain<Timestamp>,
255    ) -> (
256        ReclockOperator<
257            kafka::KafkaTimestamp,
258            Timestamp,
259            impl RemapHandle<FromTime = kafka::KafkaTimestamp, IntoTime = Timestamp>,
260        >,
261        ReclockBatch<kafka::KafkaTimestamp, Timestamp>,
262    ) {
263        let metadata = CollectionMetadata {
264            persist_location: PersistLocation {
265                blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
266                consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
267            },
268            data_shard: shard,
269            relation_desc: RelationDesc::empty(),
270            txns_shard: None,
271        };
272
273        let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
274
275        // Always in read-write mode for tests.
276        let (_read_only_tx, read_only_rx) = watch::channel(false);
277        let remap_handle = crate::source::reclock::compat::PersistHandle::new(
278            Arc::clone(&*PERSIST_CACHE),
279            read_only_rx,
280            metadata,
281            as_of.clone(),
282            write_frontier,
283            GlobalId::Explain,
284            "unittest",
285            0,
286            1,
287            PROGRESS_DESC.clone(),
288            GlobalId::Explain,
289        )
290        .await
291        .unwrap();
292
293        let (mut operator, mut initial_batch) = ReclockOperator::new(remap_handle).await;
294
295        // Push any updates that might already exist in the persist shard to the follower.
296        if *initial_batch.upper == [Timestamp::minimum()] {
297            // In the tests we always reclock the minimum source frontier to the minimum target
298            // frontier, which we do in this step.
299            initial_batch = operator
300                .mint(
301                    0.into(),
302                    Antichain::from_elem(1.into()),
303                    Antichain::from_elem(Partitioned::minimum()).borrow(),
304                )
305                .await;
306        }
307
308        (operator, initial_batch)
309    }
310
311    /// Generates a [`kafka::NativeFrontier`] antichain where all the provided
312    /// partitions are at the specified offset and the gaps in between are filled with range
313    /// timestamps at offset zero.
314    fn partitioned_frontier<I>(items: I) -> Antichain<kafka::KafkaTimestamp>
315    where
316        I: IntoIterator<Item = (i32, MzOffset)>,
317    {
318        let mut frontier = Antichain::new();
319        let mut prev = RB::NegInfinity;
320        for (pid, offset) in items {
321            assert!(prev < RB::before(pid));
322            let gap = Partitioned::new_range(prev, RB::before(pid), MzOffset::from(0));
323            frontier.extend([gap, Partitioned::new_singleton(RB::exact(pid), offset)]);
324            prev = RB::after(pid);
325        }
326        frontier.insert(Partitioned::new_range(
327            prev,
328            RB::PosInfinity,
329            MzOffset::from(0),
330        ));
331        frontier
332    }
333
334    #[mz_ore::test(tokio::test)]
335    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
336    async fn test_basic_usage() {
337        let (mut operator, _) =
338            make_test_operator(ShardId::new(), Antichain::from_elem(0.into())).await;
339
340        // Reclock offsets 1 and 3 to timestamp 1000
341        let source_upper = partitioned_frontier([(0, MzOffset::from(4))]);
342        let mut batch = operator
343            .mint(
344                1000.into(),
345                Antichain::from_elem(1001.into()),
346                source_upper.borrow(),
347            )
348            .await;
349        let mut expected_batch: ReclockBatch<_, Timestamp> = ReclockBatch {
350            updates: vec![
351                (
352                    Partitioned::new_range(RB::NegInfinity, RB::before(0), MzOffset::from(0)),
353                    1000.into(),
354                    Diff::ONE,
355                ),
356                (
357                    Partitioned::new_range(RB::after(0), RB::PosInfinity, MzOffset::from(0)),
358                    1000.into(),
359                    Diff::ONE,
360                ),
361                (
362                    Partitioned::new_range(RB::NegInfinity, RB::PosInfinity, MzOffset::from(0)),
363                    1000.into(),
364                    Diff::MINUS_ONE,
365                ),
366                (
367                    Partitioned::new_singleton(RB::exact(0), MzOffset::from(4)),
368                    1000.into(),
369                    Diff::ONE,
370                ),
371            ],
372            upper: Antichain::from_elem(Timestamp::from(1001)),
373        };
374        batch.updates.sort();
375        expected_batch.updates.sort();
376        assert_eq!(batch, expected_batch);
377    }
378
379    #[mz_ore::test(tokio::test)]
380    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
381    async fn test_compaction() {
382        let persist_location = PersistLocation {
383            blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
384            consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
385        };
386
387        let remap_shard = ShardId::new();
388
389        let persist_client = PERSIST_CACHE
390            .open(persist_location)
391            .await
392            .expect("error creating persist client");
393
394        let mut remap_read_handle = persist_client
395            .open_critical_since::<SourceData, (), Timestamp, StorageDiff, u64>(
396                remap_shard,
397                PersistClient::CONTROLLER_CRITICAL_SINCE,
398                Diagnostics::from_purpose("test_since_hold"),
399            )
400            .await
401            .expect("error opening persist shard");
402
403        let (mut operator, _batch) =
404            make_test_operator(remap_shard, Antichain::from_elem(0.into())).await;
405
406        // Ming a few bindings
407        let source_upper = partitioned_frontier([(0, MzOffset::from(3))]);
408        operator
409            .mint(
410                1000.into(),
411                Antichain::from_elem(1001.into()),
412                source_upper.borrow(),
413            )
414            .await;
415
416        let source_upper = partitioned_frontier([(0, MzOffset::from(5))]);
417        operator
418            .mint(
419                2000.into(),
420                Antichain::from_elem(2001.into()),
421                source_upper.borrow(),
422            )
423            .await;
424
425        // Compact enough so that offsets >= 3 remain uncompacted
426        remap_read_handle
427            .compare_and_downgrade_since(&0, (&0, &Antichain::from_elem(1000.into())))
428            .await
429            .unwrap();
430
431        // Starting a new operator with an `as_of` is the same as having compacted
432        let (_operator, mut initial_batch) =
433            make_test_operator(remap_shard, Antichain::from_elem(1000.into())).await;
434
435        let mut expected_batch: ReclockBatch<_, Timestamp> = ReclockBatch {
436            updates: vec![
437                (
438                    Partitioned::new_range(RB::NegInfinity, RB::before(0), MzOffset::from(0)),
439                    1000.into(),
440                    Diff::ONE,
441                ),
442                (
443                    Partitioned::new_range(RB::after(0), RB::PosInfinity, MzOffset::from(0)),
444                    1000.into(),
445                    Diff::ONE,
446                ),
447                (
448                    Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
449                    1000.into(),
450                    Diff::ONE,
451                ),
452                (
453                    Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
454                    2000.into(),
455                    Diff::MINUS_ONE,
456                ),
457                (
458                    Partitioned::new_singleton(RB::exact(0), MzOffset::from(5)),
459                    2000.into(),
460                    Diff::ONE,
461                ),
462            ],
463            upper: Antichain::from_elem(Timestamp::from(2001)),
464        };
465        expected_batch.updates.sort();
466        initial_batch.updates.sort();
467        assert_eq!(initial_batch, expected_batch);
468    }
469
470    #[mz_ore::test(tokio::test)]
471    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
472    async fn test_concurrency() {
473        // Create two operators pointing to the same shard
474        let shared_shard = ShardId::new();
475        let (mut op_a, _) = make_test_operator(shared_shard, Antichain::from_elem(0.into())).await;
476        let (mut op_b, _) = make_test_operator(shared_shard, Antichain::from_elem(0.into())).await;
477
478        // Mint some bindings through operator A
479        let source_upper = partitioned_frontier([(0, MzOffset::from(3))]);
480        let mut batch = op_a
481            .mint(
482                1000.into(),
483                Antichain::from_elem(1001.into()),
484                source_upper.borrow(),
485            )
486            .await;
487        let mut expected_batch: ReclockBatch<_, Timestamp> = ReclockBatch {
488            updates: vec![
489                (
490                    Partitioned::new_range(RB::NegInfinity, RB::before(0), MzOffset::from(0)),
491                    1000.into(),
492                    Diff::ONE,
493                ),
494                (
495                    Partitioned::new_range(RB::after(0), RB::PosInfinity, MzOffset::from(0)),
496                    1000.into(),
497                    Diff::ONE,
498                ),
499                (
500                    Partitioned::new_range(RB::NegInfinity, RB::PosInfinity, MzOffset::from(0)),
501                    1000.into(),
502                    Diff::MINUS_ONE,
503                ),
504                (
505                    Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
506                    1000.into(),
507                    Diff::ONE,
508                ),
509            ],
510            upper: Antichain::from_elem(Timestamp::from(1001)),
511        };
512        batch.updates.sort();
513        expected_batch.updates.sort();
514        assert_eq!(batch, expected_batch);
515
516        // Operator B should attempt to mint in one go, fail, re-sync, and retry only for the
517        // bindings that still need minting
518        let source_upper = partitioned_frontier([(0, MzOffset::from(5))]);
519        let mut batch = op_b
520            .mint(
521                11000.into(),
522                Antichain::from_elem(11001.into()),
523                source_upper.borrow(),
524            )
525            .await;
526        expected_batch.updates.extend([
527            (
528                Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
529                11000.into(),
530                Diff::MINUS_ONE,
531            ),
532            (
533                Partitioned::new_singleton(RB::exact(0), MzOffset::from(5)),
534                11000.into(),
535                Diff::ONE,
536            ),
537        ]);
538        expected_batch.upper = Antichain::from_elem(Timestamp::from(11001));
539        batch.updates.sort();
540        expected_batch.updates.sort();
541        assert_eq!(batch, expected_batch);
542    }
543
544    // Regression test for
545    // https://github.com/MaterializeInc/database-issues/issues/4216.
546    #[mz_ore::test(tokio::test(start_paused = true))]
547    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
548    async fn test_since_hold() {
549        let binding_shard = ShardId::new();
550
551        let (mut operator, _) =
552            make_test_operator(binding_shard, Antichain::from_elem(0.into())).await;
553
554        // We do multiple rounds of minting. This will downgrade the since of
555        // the internal listen. If we didn't make sure to also heartbeat the
556        // internal handle that holds back the overall remap since the checks
557        // below would fail.
558        //
559        // We do two rounds and advance the time by half the lease timeout in
560        // between so that the "listen handle" will not timeout but the internal
561        // handle used for holding back the since will timeout.
562
563        tokio::time::advance(PERSIST_READER_LEASE_TIMEOUT_MS / 2 + Duration::from_millis(1)).await;
564        let source_upper = partitioned_frontier([(0, MzOffset::from(3))]);
565        let _ = operator
566            .mint(
567                1000.into(),
568                Antichain::from_elem(1001.into()),
569                source_upper.borrow(),
570            )
571            .await;
572
573        tokio::time::advance(PERSIST_READER_LEASE_TIMEOUT_MS / 2 + Duration::from_millis(1)).await;
574        let source_upper = partitioned_frontier([(0, MzOffset::from(5))]);
575        let _ = operator
576            .mint(
577                2000.into(),
578                Antichain::from_elem(2001.into()),
579                source_upper.borrow(),
580            )
581            .await;
582
583        // Allow time for background maintenance work, which does lease
584        // expiration. 1 ms is enough here, we just need to yield to allow the
585        // background task to be "scheduled".
586        tokio::time::sleep(Duration::from_millis(1)).await;
587
588        // Starting a new operator with an `as_of` of `0`, to verify that
589        // holding back the `since` of the remap shard works as expected.
590        let (_operator, _batch) =
591            make_test_operator(binding_shard, Antichain::from_elem(0.into())).await;
592
593        // Also manually assert the since of the remap shard.
594        let persist_location = PersistLocation {
595            blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
596            consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
597        };
598
599        let persist_client = PERSIST_CACHE
600            .open(persist_location)
601            .await
602            .expect("error creating persist client");
603
604        let read_handle = persist_client
605            .open_leased_reader::<SourceData, (), Timestamp, StorageDiff>(
606                binding_shard,
607                Arc::new(PROGRESS_DESC.clone()),
608                Arc::new(UnitSchema),
609                Diagnostics::from_purpose("test_since_hold"),
610                true,
611            )
612            .await
613            .expect("error opening persist shard");
614
615        assert_eq!(
616            Antichain::from_elem(0.into()),
617            read_handle.since().to_owned()
618        );
619    }
620}