Skip to main content

mz_storage/source/
reclock.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10/// The `ReclockOperator` observes the progress of a stream that is
11/// timestamped with some source time `FromTime` and generates bindings that describe how the
12/// collection should evolve in target time `IntoTime`.
13use differential_dataflow::consolidation;
14use differential_dataflow::lattice::Lattice;
15use mz_persist_client::error::UpperMismatch;
16use mz_repr::Diff;
17use mz_storage_client::util::remap_handle::RemapHandle;
18use timely::order::PartialOrder;
19use timely::progress::Timestamp;
20use timely::progress::frontier::{Antichain, AntichainRef, MutableAntichain};
21
22pub mod compat;
23
24/// The `ReclockOperator` is responsible for observing progress in the `FromTime` domain and
25/// consume messages from a ticker of progress in the `IntoTime` domain. When the source frontier
26/// advances and the ticker ticks the `ReclockOperator` will generate the data that describe this
27/// correspondence and write them out to its provided remap handle. The output generated by the
28/// reclock operator can be thought of as `Collection<G, FromTime>` where `G::Timestamp` is
29/// `IntoTime`.
30///
31/// The `ReclockOperator` will always maintain the invariant that for any time `IntoTime` the remap
32/// collection accumulates into an Antichain where each `FromTime` timestamp has frequency `1`. In
33/// other words the remap collection describes a well formed `Antichain<FromTime>` as it is
34/// marching forwards.
35#[derive(Debug)]
36pub struct ReclockOperator<
37    FromTime: Timestamp,
38    IntoTime: Timestamp + Lattice,
39    Handle: RemapHandle<FromTime = FromTime, IntoTime = IntoTime>,
40> {
41    /// Upper frontier of the partial remap trace
42    upper: Antichain<IntoTime>,
43    /// The upper frontier in terms of `FromTime`. Any attempt to reclock messages beyond this
44    /// frontier will lead to minting new bindings.
45    source_upper: MutableAntichain<FromTime>,
46
47    /// A handle allowing this operator to publish updates to and read back from the remap collection
48    remap_handle: Handle,
49}
50
51#[derive(Clone, Debug, PartialEq)]
52pub struct ReclockBatch<FromTime, IntoTime> {
53    pub updates: Vec<(FromTime, IntoTime, Diff)>,
54    pub upper: Antichain<IntoTime>,
55}
56
57impl<FromTime, IntoTime, Handle> ReclockOperator<FromTime, IntoTime, Handle>
58where
59    FromTime: Timestamp,
60    IntoTime: Timestamp + Lattice,
61    Handle: RemapHandle<FromTime = FromTime, IntoTime = IntoTime>,
62{
63    /// Construct a new [ReclockOperator] from the given collection metadata
64    pub async fn new(remap_handle: Handle) -> (Self, ReclockBatch<FromTime, IntoTime>) {
65        let upper = remap_handle.upper().clone();
66
67        let mut operator = Self {
68            upper: Antichain::from_elem(IntoTime::minimum()),
69            source_upper: MutableAntichain::new(),
70            remap_handle,
71        };
72
73        // Load the initial state that might exist in the shard
74        let trace_batch = if upper.elements() != [IntoTime::minimum()] {
75            operator.sync(upper.borrow()).await
76        } else {
77            ReclockBatch {
78                updates: vec![],
79                upper: Antichain::from_elem(IntoTime::minimum()),
80            }
81        };
82
83        (operator, trace_batch)
84    }
85
86    /// Syncs the state of this operator to match that of the persist shard until the provided
87    /// frontier
88    async fn sync(
89        &mut self,
90        target_upper: AntichainRef<'_, IntoTime>,
91    ) -> ReclockBatch<FromTime, IntoTime> {
92        let mut updates: Vec<(FromTime, IntoTime, Diff)> = Vec::new();
93
94        // Tail the remap collection until we reach the target upper frontier. Note that, in the
95        // common case, we are also the writer, so we are waiting to read-back what we wrote
96        while PartialOrder::less_than(&self.upper.borrow(), &target_upper) {
97            let (mut batch, upper) = self
98                .remap_handle
99                .next()
100                .await
101                .expect("requested data after empty antichain");
102            self.upper = upper;
103            updates.append(&mut batch);
104        }
105
106        self.source_upper.update_iter(
107            updates
108                .iter()
109                .map(|(src_ts, _dest_ts, diff)| (src_ts.clone(), diff.into_inner())),
110        );
111
112        ReclockBatch {
113            updates,
114            upper: self.upper.clone(),
115        }
116    }
117
118    pub async fn mint(
119        &mut self,
120        binding_ts: IntoTime,
121        mut new_into_upper: Antichain<IntoTime>,
122        new_from_upper: AntichainRef<'_, FromTime>,
123    ) -> ReclockBatch<FromTime, IntoTime> {
124        assert!(!new_into_upper.less_equal(&binding_ts));
125        // The updates to the remap trace that occured during minting.
126        let mut batch = ReclockBatch {
127            updates: vec![],
128            upper: self.upper.clone(),
129        };
130
131        while *self.upper == [IntoTime::minimum()]
132            || (PartialOrder::less_equal(&self.source_upper.frontier(), &new_from_upper)
133                && PartialOrder::less_than(&self.upper, &new_into_upper)
134                && self.upper.less_equal(&binding_ts))
135        {
136            // If source is closed, close remap shard as well.
137            if new_from_upper.is_empty() {
138                new_into_upper = Antichain::new();
139            }
140
141            // If this is the first binding we mint then we will mint it at the minimum target
142            // timestamp. The first source upper is always the upper of the snapshot and by mapping
143            // it to the minimum target timestamp we make it so that the final shard never appears
144            // empty at any timestamp.
145            let binding_ts = if *self.upper == [IntoTime::minimum()] {
146                IntoTime::minimum()
147            } else {
148                binding_ts.clone()
149            };
150
151            let mut updates = vec![];
152            for src_ts in self.source_upper.frontier().iter().cloned() {
153                updates.push((src_ts, binding_ts.clone(), Diff::MINUS_ONE));
154            }
155            for src_ts in new_from_upper.iter().cloned() {
156                updates.push((src_ts, binding_ts.clone(), Diff::ONE));
157            }
158            consolidation::consolidate_updates(&mut updates);
159
160            let new_batch = match self.append_batch(updates, &new_into_upper).await {
161                Ok(trace_batch) => trace_batch,
162                Err(UpperMismatch { current, .. }) => self.sync(current.borrow()).await,
163            };
164            batch.updates.extend(new_batch.updates);
165            batch.upper = new_batch.upper;
166        }
167
168        batch
169    }
170
171    /// Appends the provided updates to the remap collection at the next available minting
172    /// IntoTime and updates this operator's in-memory state accordingly.
173    ///
174    /// If an attempt to mint bindings fails due to another process having raced and appended
175    /// bindings concurrently then the current global upper will be returned as an error. This is
176    /// the frontier that this operator must be synced to for a future append attempt to have any
177    /// chance of success.
178    async fn append_batch(
179        &mut self,
180        updates: Vec<(FromTime, IntoTime, Diff)>,
181        new_upper: &Antichain<IntoTime>,
182    ) -> Result<ReclockBatch<FromTime, IntoTime>, UpperMismatch<IntoTime>> {
183        match self
184            .remap_handle
185            .compare_and_append(updates, self.upper.clone(), new_upper.clone())
186            .await
187        {
188            // We have successfully produced data in the remap collection so let's read back what
189            // we wrote to update our local state
190            Ok(()) => Ok(self.sync(new_upper.borrow()).await),
191            Err(mismatch) => Err(mismatch),
192        }
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use std::cell::RefCell;
199    use std::rc::Rc;
200    use std::str::FromStr;
201    use std::sync::Arc;
202    use std::sync::LazyLock;
203    use std::time::Duration;
204
205    use super::*;
206    use mz_build_info::DUMMY_BUILD_INFO;
207    use mz_ore::metrics::MetricsRegistry;
208    use mz_ore::now::SYSTEM_TIME;
209    use mz_ore::url::SensitiveUrl;
210    use mz_persist_client::cache::PersistClientCache;
211    use mz_persist_client::cfg::PersistConfig;
212    use mz_persist_client::critical::Opaque;
213    use mz_persist_client::rpc::PubSubClientConnection;
214    use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
215    use mz_persist_types::codec_impls::UnitSchema;
216    use mz_repr::{GlobalId, RelationDesc, SqlScalarType, Timestamp};
217    use mz_storage_client::util::remap_handle::RemapHandle;
218    use mz_storage_types::StorageDiff;
219    use mz_storage_types::controller::CollectionMetadata;
220    use mz_storage_types::sources::kafka::{self, RangeBound as RB};
221    use mz_storage_types::sources::{MzOffset, SourceData};
222    use mz_timely_util::order::Partitioned;
223    use timely::progress::Timestamp as _;
224    use tokio::sync::watch;
225
226    // 15 minutes
227    static PERSIST_READER_LEASE_TIMEOUT_MS: Duration = Duration::from_secs(60 * 15);
228
229    static PERSIST_CACHE: LazyLock<Arc<PersistClientCache>> = LazyLock::new(|| {
230        let persistcfg = PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone());
231        persistcfg.set_reader_lease_duration(PERSIST_READER_LEASE_TIMEOUT_MS);
232        Arc::new(PersistClientCache::new(
233            persistcfg,
234            &MetricsRegistry::new(),
235            |_, _| PubSubClientConnection::noop(),
236        ))
237    });
238
239    static PROGRESS_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
240        RelationDesc::builder()
241            .with_column(
242                "partition",
243                SqlScalarType::Range {
244                    element_type: Box::new(SqlScalarType::Numeric { max_scale: None }),
245                }
246                .nullable(false),
247            )
248            .with_column("offset", SqlScalarType::UInt64.nullable(true))
249            .finish()
250    });
251
252    async fn make_test_operator(
253        shard: ShardId,
254        as_of: Antichain<Timestamp>,
255    ) -> (
256        ReclockOperator<
257            kafka::KafkaTimestamp,
258            Timestamp,
259            impl RemapHandle<FromTime = kafka::KafkaTimestamp, IntoTime = Timestamp>,
260        >,
261        ReclockBatch<kafka::KafkaTimestamp, Timestamp>,
262    ) {
263        let metadata = CollectionMetadata {
264            persist_location: PersistLocation {
265                blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
266                consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
267            },
268            data_shard: shard,
269            relation_desc: RelationDesc::empty(),
270            txns_shard: None,
271        };
272
273        let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
274
275        // Always in read-write mode for tests.
276        let (_read_only_tx, read_only_rx) = watch::channel(false);
277        let remap_handle = crate::source::reclock::compat::PersistHandle::new(
278            Arc::clone(&*PERSIST_CACHE),
279            read_only_rx,
280            metadata,
281            as_of.clone(),
282            write_frontier,
283            GlobalId::Explain,
284            "unittest",
285            0,
286            1,
287            PROGRESS_DESC.clone(),
288            GlobalId::Explain,
289        )
290        .await
291        .unwrap();
292
293        let (mut operator, mut initial_batch) = ReclockOperator::new(remap_handle).await;
294
295        // Push any updates that might already exist in the persist shard to the follower.
296        if *initial_batch.upper == [Timestamp::minimum()] {
297            // In the tests we always reclock the minimum source frontier to the minimum target
298            // frontier, which we do in this step.
299            initial_batch = operator
300                .mint(
301                    0.into(),
302                    Antichain::from_elem(1.into()),
303                    Antichain::from_elem(Partitioned::minimum()).borrow(),
304                )
305                .await;
306        }
307
308        (operator, initial_batch)
309    }
310
311    /// Generates a [`kafka::NativeFrontier`] antichain where all the provided
312    /// partitions are at the specified offset and the gaps in between are filled with range
313    /// timestamps at offset zero.
314    fn partitioned_frontier<I>(items: I) -> Antichain<kafka::KafkaTimestamp>
315    where
316        I: IntoIterator<Item = (i32, MzOffset)>,
317    {
318        let mut frontier = Antichain::new();
319        let mut prev = RB::NegInfinity;
320        for (pid, offset) in items {
321            assert!(prev < RB::before(pid));
322            let gap = Partitioned::new_range(prev, RB::before(pid), MzOffset::from(0));
323            frontier.extend([gap, Partitioned::new_singleton(RB::exact(pid), offset)]);
324            prev = RB::after(pid);
325        }
326        frontier.insert(Partitioned::new_range(
327            prev,
328            RB::PosInfinity,
329            MzOffset::from(0),
330        ));
331        frontier
332    }
333
334    #[mz_ore::test(tokio::test)]
335    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
336    async fn test_basic_usage() {
337        let (mut operator, _) =
338            make_test_operator(ShardId::new(), Antichain::from_elem(0.into())).await;
339
340        // Reclock offsets 1 and 3 to timestamp 1000
341        let source_upper = partitioned_frontier([(0, MzOffset::from(4))]);
342        let mut batch = operator
343            .mint(
344                1000.into(),
345                Antichain::from_elem(1001.into()),
346                source_upper.borrow(),
347            )
348            .await;
349        let mut expected_batch: ReclockBatch<_, Timestamp> = ReclockBatch {
350            updates: vec![
351                (
352                    Partitioned::new_range(RB::NegInfinity, RB::before(0), MzOffset::from(0)),
353                    1000.into(),
354                    Diff::ONE,
355                ),
356                (
357                    Partitioned::new_range(RB::after(0), RB::PosInfinity, MzOffset::from(0)),
358                    1000.into(),
359                    Diff::ONE,
360                ),
361                (
362                    Partitioned::new_range(RB::NegInfinity, RB::PosInfinity, MzOffset::from(0)),
363                    1000.into(),
364                    Diff::MINUS_ONE,
365                ),
366                (
367                    Partitioned::new_singleton(RB::exact(0), MzOffset::from(4)),
368                    1000.into(),
369                    Diff::ONE,
370                ),
371            ],
372            upper: Antichain::from_elem(Timestamp::from(1001)),
373        };
374        batch.updates.sort();
375        expected_batch.updates.sort();
376        assert_eq!(batch, expected_batch);
377    }
378
379    #[mz_ore::test(tokio::test)]
380    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
381    async fn test_compaction() {
382        let persist_location = PersistLocation {
383            blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
384            consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
385        };
386
387        let remap_shard = ShardId::new();
388
389        let persist_client = PERSIST_CACHE
390            .open(persist_location)
391            .await
392            .expect("error creating persist client");
393
394        let mut remap_read_handle = persist_client
395            .open_critical_since::<SourceData, (), Timestamp, StorageDiff>(
396                remap_shard,
397                PersistClient::CONTROLLER_CRITICAL_SINCE,
398                Opaque::encode(&0u64),
399                Diagnostics::from_purpose("test_since_hold"),
400            )
401            .await
402            .expect("error opening persist shard");
403
404        let (mut operator, _batch) =
405            make_test_operator(remap_shard, Antichain::from_elem(0.into())).await;
406
407        // Ming a few bindings
408        let source_upper = partitioned_frontier([(0, MzOffset::from(3))]);
409        operator
410            .mint(
411                1000.into(),
412                Antichain::from_elem(1001.into()),
413                source_upper.borrow(),
414            )
415            .await;
416
417        let source_upper = partitioned_frontier([(0, MzOffset::from(5))]);
418        operator
419            .mint(
420                2000.into(),
421                Antichain::from_elem(2001.into()),
422                source_upper.borrow(),
423            )
424            .await;
425
426        // Compact enough so that offsets >= 3 remain uncompacted
427        remap_read_handle
428            .compare_and_downgrade_since(
429                &Opaque::encode(&0u64),
430                (&Opaque::encode(&0u64), &Antichain::from_elem(1000.into())),
431            )
432            .await
433            .unwrap();
434
435        // Starting a new operator with an `as_of` is the same as having compacted
436        let (_operator, mut initial_batch) =
437            make_test_operator(remap_shard, Antichain::from_elem(1000.into())).await;
438
439        let mut expected_batch: ReclockBatch<_, Timestamp> = ReclockBatch {
440            updates: vec![
441                (
442                    Partitioned::new_range(RB::NegInfinity, RB::before(0), MzOffset::from(0)),
443                    1000.into(),
444                    Diff::ONE,
445                ),
446                (
447                    Partitioned::new_range(RB::after(0), RB::PosInfinity, MzOffset::from(0)),
448                    1000.into(),
449                    Diff::ONE,
450                ),
451                (
452                    Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
453                    1000.into(),
454                    Diff::ONE,
455                ),
456                (
457                    Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
458                    2000.into(),
459                    Diff::MINUS_ONE,
460                ),
461                (
462                    Partitioned::new_singleton(RB::exact(0), MzOffset::from(5)),
463                    2000.into(),
464                    Diff::ONE,
465                ),
466            ],
467            upper: Antichain::from_elem(Timestamp::from(2001)),
468        };
469        expected_batch.updates.sort();
470        initial_batch.updates.sort();
471        assert_eq!(initial_batch, expected_batch);
472    }
473
474    #[mz_ore::test(tokio::test)]
475    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
476    async fn test_concurrency() {
477        // Create two operators pointing to the same shard
478        let shared_shard = ShardId::new();
479        let (mut op_a, _) = make_test_operator(shared_shard, Antichain::from_elem(0.into())).await;
480        let (mut op_b, _) = make_test_operator(shared_shard, Antichain::from_elem(0.into())).await;
481
482        // Mint some bindings through operator A
483        let source_upper = partitioned_frontier([(0, MzOffset::from(3))]);
484        let mut batch = op_a
485            .mint(
486                1000.into(),
487                Antichain::from_elem(1001.into()),
488                source_upper.borrow(),
489            )
490            .await;
491        let mut expected_batch: ReclockBatch<_, Timestamp> = ReclockBatch {
492            updates: vec![
493                (
494                    Partitioned::new_range(RB::NegInfinity, RB::before(0), MzOffset::from(0)),
495                    1000.into(),
496                    Diff::ONE,
497                ),
498                (
499                    Partitioned::new_range(RB::after(0), RB::PosInfinity, MzOffset::from(0)),
500                    1000.into(),
501                    Diff::ONE,
502                ),
503                (
504                    Partitioned::new_range(RB::NegInfinity, RB::PosInfinity, MzOffset::from(0)),
505                    1000.into(),
506                    Diff::MINUS_ONE,
507                ),
508                (
509                    Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
510                    1000.into(),
511                    Diff::ONE,
512                ),
513            ],
514            upper: Antichain::from_elem(Timestamp::from(1001)),
515        };
516        batch.updates.sort();
517        expected_batch.updates.sort();
518        assert_eq!(batch, expected_batch);
519
520        // Operator B should attempt to mint in one go, fail, re-sync, and retry only for the
521        // bindings that still need minting
522        let source_upper = partitioned_frontier([(0, MzOffset::from(5))]);
523        let mut batch = op_b
524            .mint(
525                11000.into(),
526                Antichain::from_elem(11001.into()),
527                source_upper.borrow(),
528            )
529            .await;
530        expected_batch.updates.extend([
531            (
532                Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
533                11000.into(),
534                Diff::MINUS_ONE,
535            ),
536            (
537                Partitioned::new_singleton(RB::exact(0), MzOffset::from(5)),
538                11000.into(),
539                Diff::ONE,
540            ),
541        ]);
542        expected_batch.upper = Antichain::from_elem(Timestamp::from(11001));
543        batch.updates.sort();
544        expected_batch.updates.sort();
545        assert_eq!(batch, expected_batch);
546    }
547
548    // Regression test for
549    // https://github.com/MaterializeInc/database-issues/issues/4216.
550    #[mz_ore::test(tokio::test(start_paused = true))]
551    #[cfg_attr(miri, ignore)]
552    // error: unsupported operation: can't call foreign function `decNumberFromInt32` on OS `linux`
553    // This test depends on the exact details of when the Persist client flushes frontier progress
554    // to state, which is not public API and should not be essential for correctness. (Today, the
555    // as_of is held back by the spawned task in the async storage worker.)
556    #[ignore]
557    async fn test_since_hold() {
558        let binding_shard = ShardId::new();
559
560        let (mut operator, _) =
561            make_test_operator(binding_shard, Antichain::from_elem(0.into())).await;
562
563        // We do multiple rounds of minting. This will downgrade the since of
564        // the internal listen. If we didn't make sure to also heartbeat the
565        // internal handle that holds back the overall remap since the checks
566        // below would fail.
567        //
568        // We do two rounds and advance the time by half the lease timeout in
569        // between so that the "listen handle" will not timeout but the internal
570        // handle used for holding back the since will timeout.
571
572        tokio::time::advance(PERSIST_READER_LEASE_TIMEOUT_MS / 2 + Duration::from_millis(1)).await;
573        let source_upper = partitioned_frontier([(0, MzOffset::from(3))]);
574        let _ = operator
575            .mint(
576                1000.into(),
577                Antichain::from_elem(1001.into()),
578                source_upper.borrow(),
579            )
580            .await;
581
582        tokio::time::advance(PERSIST_READER_LEASE_TIMEOUT_MS / 2 + Duration::from_millis(1)).await;
583        let source_upper = partitioned_frontier([(0, MzOffset::from(5))]);
584        let _ = operator
585            .mint(
586                2000.into(),
587                Antichain::from_elem(2001.into()),
588                source_upper.borrow(),
589            )
590            .await;
591
592        // Allow time for background maintenance work, which does lease
593        // expiration. 1 ms is enough here, we just need to yield to allow the
594        // background task to be "scheduled".
595        tokio::time::sleep(Duration::from_millis(1)).await;
596
597        // Starting a new operator with an `as_of` of `0`, to verify that
598        // holding back the `since` of the remap shard works as expected.
599        let (_operator, _batch) =
600            make_test_operator(binding_shard, Antichain::from_elem(0.into())).await;
601
602        // Also manually assert the since of the remap shard.
603        let persist_location = PersistLocation {
604            blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
605            consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
606        };
607
608        let persist_client = PERSIST_CACHE
609            .open(persist_location)
610            .await
611            .expect("error creating persist client");
612
613        let read_handle = persist_client
614            .open_leased_reader::<SourceData, (), Timestamp, StorageDiff>(
615                binding_shard,
616                Arc::new(PROGRESS_DESC.clone()),
617                Arc::new(UnitSchema),
618                Diagnostics::from_purpose("test_since_hold"),
619                true,
620            )
621            .await
622            .expect("error opening persist shard");
623
624        assert_eq!(
625            Antichain::from_elem(0.into()),
626            read_handle.since().to_owned()
627        );
628    }
629}