1use differential_dataflow::consolidation;
14use differential_dataflow::lattice::Lattice;
15use mz_persist_client::error::UpperMismatch;
16use mz_repr::Diff;
17use mz_storage_client::util::remap_handle::RemapHandle;
18use timely::order::PartialOrder;
19use timely::progress::Timestamp;
20use timely::progress::frontier::{Antichain, AntichainRef, MutableAntichain};
21
22pub mod compat;
23
24#[derive(Debug)]
36pub struct ReclockOperator<
37 FromTime: Timestamp,
38 IntoTime: Timestamp + Lattice,
39 Handle: RemapHandle<FromTime = FromTime, IntoTime = IntoTime>,
40> {
41 upper: Antichain<IntoTime>,
43 source_upper: MutableAntichain<FromTime>,
46
47 remap_handle: Handle,
49}
50
51#[derive(Clone, Debug, PartialEq)]
52pub struct ReclockBatch<FromTime, IntoTime> {
53 pub updates: Vec<(FromTime, IntoTime, Diff)>,
54 pub upper: Antichain<IntoTime>,
55}
56
57impl<FromTime, IntoTime, Handle> ReclockOperator<FromTime, IntoTime, Handle>
58where
59 FromTime: Timestamp,
60 IntoTime: Timestamp + Lattice,
61 Handle: RemapHandle<FromTime = FromTime, IntoTime = IntoTime>,
62{
63 pub async fn new(remap_handle: Handle) -> (Self, ReclockBatch<FromTime, IntoTime>) {
65 let upper = remap_handle.upper().clone();
66
67 let mut operator = Self {
68 upper: Antichain::from_elem(IntoTime::minimum()),
69 source_upper: MutableAntichain::new(),
70 remap_handle,
71 };
72
73 let trace_batch = if upper.elements() != [IntoTime::minimum()] {
75 operator.sync(upper.borrow()).await
76 } else {
77 ReclockBatch {
78 updates: vec![],
79 upper: Antichain::from_elem(IntoTime::minimum()),
80 }
81 };
82
83 (operator, trace_batch)
84 }
85
86 async fn sync(
89 &mut self,
90 target_upper: AntichainRef<'_, IntoTime>,
91 ) -> ReclockBatch<FromTime, IntoTime> {
92 let mut updates: Vec<(FromTime, IntoTime, Diff)> = Vec::new();
93
94 while PartialOrder::less_than(&self.upper.borrow(), &target_upper) {
97 let (mut batch, upper) = self
98 .remap_handle
99 .next()
100 .await
101 .expect("requested data after empty antichain");
102 self.upper = upper;
103 updates.append(&mut batch);
104 }
105
106 self.source_upper.update_iter(
107 updates
108 .iter()
109 .map(|(src_ts, _dest_ts, diff)| (src_ts.clone(), diff.into_inner())),
110 );
111
112 ReclockBatch {
113 updates,
114 upper: self.upper.clone(),
115 }
116 }
117
118 pub async fn mint(
119 &mut self,
120 binding_ts: IntoTime,
121 mut new_into_upper: Antichain<IntoTime>,
122 new_from_upper: AntichainRef<'_, FromTime>,
123 ) -> ReclockBatch<FromTime, IntoTime> {
124 assert!(!new_into_upper.less_equal(&binding_ts));
125 let mut batch = ReclockBatch {
127 updates: vec![],
128 upper: self.upper.clone(),
129 };
130
131 while *self.upper == [IntoTime::minimum()]
132 || (PartialOrder::less_equal(&self.source_upper.frontier(), &new_from_upper)
133 && PartialOrder::less_than(&self.upper, &new_into_upper)
134 && self.upper.less_equal(&binding_ts))
135 {
136 if new_from_upper.is_empty() {
138 new_into_upper = Antichain::new();
139 }
140
141 let binding_ts = if *self.upper == [IntoTime::minimum()] {
146 IntoTime::minimum()
147 } else {
148 binding_ts.clone()
149 };
150
151 let mut updates = vec![];
152 for src_ts in self.source_upper.frontier().iter().cloned() {
153 updates.push((src_ts, binding_ts.clone(), Diff::MINUS_ONE));
154 }
155 for src_ts in new_from_upper.iter().cloned() {
156 updates.push((src_ts, binding_ts.clone(), Diff::ONE));
157 }
158 consolidation::consolidate_updates(&mut updates);
159
160 let new_batch = match self.append_batch(updates, &new_into_upper).await {
161 Ok(trace_batch) => trace_batch,
162 Err(UpperMismatch { current, .. }) => self.sync(current.borrow()).await,
163 };
164 batch.updates.extend(new_batch.updates);
165 batch.upper = new_batch.upper;
166 }
167
168 batch
169 }
170
171 async fn append_batch(
179 &mut self,
180 updates: Vec<(FromTime, IntoTime, Diff)>,
181 new_upper: &Antichain<IntoTime>,
182 ) -> Result<ReclockBatch<FromTime, IntoTime>, UpperMismatch<IntoTime>> {
183 match self
184 .remap_handle
185 .compare_and_append(updates, self.upper.clone(), new_upper.clone())
186 .await
187 {
188 Ok(()) => Ok(self.sync(new_upper.borrow()).await),
191 Err(mismatch) => Err(mismatch),
192 }
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use std::cell::RefCell;
199 use std::rc::Rc;
200 use std::str::FromStr;
201 use std::sync::Arc;
202 use std::sync::LazyLock;
203 use std::time::Duration;
204
205 use mz_build_info::DUMMY_BUILD_INFO;
206 use mz_ore::metrics::MetricsRegistry;
207 use mz_ore::now::SYSTEM_TIME;
208 use mz_ore::url::SensitiveUrl;
209 use mz_persist_client::cache::PersistClientCache;
210 use mz_persist_client::cfg::PersistConfig;
211 use mz_persist_client::rpc::PubSubClientConnection;
212 use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
213 use mz_persist_types::codec_impls::UnitSchema;
214 use mz_repr::{GlobalId, RelationDesc, SqlScalarType, Timestamp};
215 use mz_storage_client::util::remap_handle::RemapHandle;
216 use mz_storage_types::StorageDiff;
217 use mz_storage_types::controller::CollectionMetadata;
218 use mz_storage_types::sources::kafka::{self, RangeBound as RB};
219 use mz_storage_types::sources::{MzOffset, SourceData};
220 use mz_timely_util::order::Partitioned;
221 use timely::progress::Timestamp as _;
222 use tokio::sync::watch;
223
224 use super::*;
225
226 static PERSIST_READER_LEASE_TIMEOUT_MS: Duration = Duration::from_secs(60 * 15);
228
229 static PERSIST_CACHE: LazyLock<Arc<PersistClientCache>> = LazyLock::new(|| {
230 let persistcfg = PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone());
231 persistcfg.set_reader_lease_duration(PERSIST_READER_LEASE_TIMEOUT_MS);
232 Arc::new(PersistClientCache::new(
233 persistcfg,
234 &MetricsRegistry::new(),
235 |_, _| PubSubClientConnection::noop(),
236 ))
237 });
238
239 static PROGRESS_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
240 RelationDesc::builder()
241 .with_column(
242 "partition",
243 SqlScalarType::Range {
244 element_type: Box::new(SqlScalarType::Numeric { max_scale: None }),
245 }
246 .nullable(false),
247 )
248 .with_column("offset", SqlScalarType::UInt64.nullable(true))
249 .finish()
250 });
251
252 async fn make_test_operator(
253 shard: ShardId,
254 as_of: Antichain<Timestamp>,
255 ) -> (
256 ReclockOperator<
257 kafka::KafkaTimestamp,
258 Timestamp,
259 impl RemapHandle<FromTime = kafka::KafkaTimestamp, IntoTime = Timestamp>,
260 >,
261 ReclockBatch<kafka::KafkaTimestamp, Timestamp>,
262 ) {
263 let metadata = CollectionMetadata {
264 persist_location: PersistLocation {
265 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
266 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
267 },
268 data_shard: shard,
269 relation_desc: RelationDesc::empty(),
270 txns_shard: None,
271 };
272
273 let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
274
275 let (_read_only_tx, read_only_rx) = watch::channel(false);
277 let remap_handle = crate::source::reclock::compat::PersistHandle::new(
278 Arc::clone(&*PERSIST_CACHE),
279 read_only_rx,
280 metadata,
281 as_of.clone(),
282 write_frontier,
283 GlobalId::Explain,
284 "unittest",
285 0,
286 1,
287 PROGRESS_DESC.clone(),
288 GlobalId::Explain,
289 )
290 .await
291 .unwrap();
292
293 let (mut operator, mut initial_batch) = ReclockOperator::new(remap_handle).await;
294
295 if *initial_batch.upper == [Timestamp::minimum()] {
297 initial_batch = operator
300 .mint(
301 0.into(),
302 Antichain::from_elem(1.into()),
303 Antichain::from_elem(Partitioned::minimum()).borrow(),
304 )
305 .await;
306 }
307
308 (operator, initial_batch)
309 }
310
311 fn partitioned_frontier<I>(items: I) -> Antichain<kafka::KafkaTimestamp>
315 where
316 I: IntoIterator<Item = (i32, MzOffset)>,
317 {
318 let mut frontier = Antichain::new();
319 let mut prev = RB::NegInfinity;
320 for (pid, offset) in items {
321 assert!(prev < RB::before(pid));
322 let gap = Partitioned::new_range(prev, RB::before(pid), MzOffset::from(0));
323 frontier.extend([gap, Partitioned::new_singleton(RB::exact(pid), offset)]);
324 prev = RB::after(pid);
325 }
326 frontier.insert(Partitioned::new_range(
327 prev,
328 RB::PosInfinity,
329 MzOffset::from(0),
330 ));
331 frontier
332 }
333
334 #[mz_ore::test(tokio::test)]
335 #[cfg_attr(miri, ignore)] async fn test_basic_usage() {
337 let (mut operator, _) =
338 make_test_operator(ShardId::new(), Antichain::from_elem(0.into())).await;
339
340 let source_upper = partitioned_frontier([(0, MzOffset::from(4))]);
342 let mut batch = operator
343 .mint(
344 1000.into(),
345 Antichain::from_elem(1001.into()),
346 source_upper.borrow(),
347 )
348 .await;
349 let mut expected_batch: ReclockBatch<_, Timestamp> = ReclockBatch {
350 updates: vec![
351 (
352 Partitioned::new_range(RB::NegInfinity, RB::before(0), MzOffset::from(0)),
353 1000.into(),
354 Diff::ONE,
355 ),
356 (
357 Partitioned::new_range(RB::after(0), RB::PosInfinity, MzOffset::from(0)),
358 1000.into(),
359 Diff::ONE,
360 ),
361 (
362 Partitioned::new_range(RB::NegInfinity, RB::PosInfinity, MzOffset::from(0)),
363 1000.into(),
364 Diff::MINUS_ONE,
365 ),
366 (
367 Partitioned::new_singleton(RB::exact(0), MzOffset::from(4)),
368 1000.into(),
369 Diff::ONE,
370 ),
371 ],
372 upper: Antichain::from_elem(Timestamp::from(1001)),
373 };
374 batch.updates.sort();
375 expected_batch.updates.sort();
376 assert_eq!(batch, expected_batch);
377 }
378
379 #[mz_ore::test(tokio::test)]
380 #[cfg_attr(miri, ignore)] async fn test_compaction() {
382 let persist_location = PersistLocation {
383 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
384 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
385 };
386
387 let remap_shard = ShardId::new();
388
389 let persist_client = PERSIST_CACHE
390 .open(persist_location)
391 .await
392 .expect("error creating persist client");
393
394 let mut remap_read_handle = persist_client
395 .open_critical_since::<SourceData, (), Timestamp, StorageDiff, u64>(
396 remap_shard,
397 PersistClient::CONTROLLER_CRITICAL_SINCE,
398 Diagnostics::from_purpose("test_since_hold"),
399 )
400 .await
401 .expect("error opening persist shard");
402
403 let (mut operator, _batch) =
404 make_test_operator(remap_shard, Antichain::from_elem(0.into())).await;
405
406 let source_upper = partitioned_frontier([(0, MzOffset::from(3))]);
408 operator
409 .mint(
410 1000.into(),
411 Antichain::from_elem(1001.into()),
412 source_upper.borrow(),
413 )
414 .await;
415
416 let source_upper = partitioned_frontier([(0, MzOffset::from(5))]);
417 operator
418 .mint(
419 2000.into(),
420 Antichain::from_elem(2001.into()),
421 source_upper.borrow(),
422 )
423 .await;
424
425 remap_read_handle
427 .compare_and_downgrade_since(&0, (&0, &Antichain::from_elem(1000.into())))
428 .await
429 .unwrap();
430
431 let (_operator, mut initial_batch) =
433 make_test_operator(remap_shard, Antichain::from_elem(1000.into())).await;
434
435 let mut expected_batch: ReclockBatch<_, Timestamp> = ReclockBatch {
436 updates: vec![
437 (
438 Partitioned::new_range(RB::NegInfinity, RB::before(0), MzOffset::from(0)),
439 1000.into(),
440 Diff::ONE,
441 ),
442 (
443 Partitioned::new_range(RB::after(0), RB::PosInfinity, MzOffset::from(0)),
444 1000.into(),
445 Diff::ONE,
446 ),
447 (
448 Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
449 1000.into(),
450 Diff::ONE,
451 ),
452 (
453 Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
454 2000.into(),
455 Diff::MINUS_ONE,
456 ),
457 (
458 Partitioned::new_singleton(RB::exact(0), MzOffset::from(5)),
459 2000.into(),
460 Diff::ONE,
461 ),
462 ],
463 upper: Antichain::from_elem(Timestamp::from(2001)),
464 };
465 expected_batch.updates.sort();
466 initial_batch.updates.sort();
467 assert_eq!(initial_batch, expected_batch);
468 }
469
470 #[mz_ore::test(tokio::test)]
471 #[cfg_attr(miri, ignore)] async fn test_concurrency() {
473 let shared_shard = ShardId::new();
475 let (mut op_a, _) = make_test_operator(shared_shard, Antichain::from_elem(0.into())).await;
476 let (mut op_b, _) = make_test_operator(shared_shard, Antichain::from_elem(0.into())).await;
477
478 let source_upper = partitioned_frontier([(0, MzOffset::from(3))]);
480 let mut batch = op_a
481 .mint(
482 1000.into(),
483 Antichain::from_elem(1001.into()),
484 source_upper.borrow(),
485 )
486 .await;
487 let mut expected_batch: ReclockBatch<_, Timestamp> = ReclockBatch {
488 updates: vec![
489 (
490 Partitioned::new_range(RB::NegInfinity, RB::before(0), MzOffset::from(0)),
491 1000.into(),
492 Diff::ONE,
493 ),
494 (
495 Partitioned::new_range(RB::after(0), RB::PosInfinity, MzOffset::from(0)),
496 1000.into(),
497 Diff::ONE,
498 ),
499 (
500 Partitioned::new_range(RB::NegInfinity, RB::PosInfinity, MzOffset::from(0)),
501 1000.into(),
502 Diff::MINUS_ONE,
503 ),
504 (
505 Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
506 1000.into(),
507 Diff::ONE,
508 ),
509 ],
510 upper: Antichain::from_elem(Timestamp::from(1001)),
511 };
512 batch.updates.sort();
513 expected_batch.updates.sort();
514 assert_eq!(batch, expected_batch);
515
516 let source_upper = partitioned_frontier([(0, MzOffset::from(5))]);
519 let mut batch = op_b
520 .mint(
521 11000.into(),
522 Antichain::from_elem(11001.into()),
523 source_upper.borrow(),
524 )
525 .await;
526 expected_batch.updates.extend([
527 (
528 Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
529 11000.into(),
530 Diff::MINUS_ONE,
531 ),
532 (
533 Partitioned::new_singleton(RB::exact(0), MzOffset::from(5)),
534 11000.into(),
535 Diff::ONE,
536 ),
537 ]);
538 expected_batch.upper = Antichain::from_elem(Timestamp::from(11001));
539 batch.updates.sort();
540 expected_batch.updates.sort();
541 assert_eq!(batch, expected_batch);
542 }
543
544 #[mz_ore::test(tokio::test(start_paused = true))]
547 #[cfg_attr(miri, ignore)] async fn test_since_hold() {
549 let binding_shard = ShardId::new();
550
551 let (mut operator, _) =
552 make_test_operator(binding_shard, Antichain::from_elem(0.into())).await;
553
554 tokio::time::advance(PERSIST_READER_LEASE_TIMEOUT_MS / 2 + Duration::from_millis(1)).await;
564 let source_upper = partitioned_frontier([(0, MzOffset::from(3))]);
565 let _ = operator
566 .mint(
567 1000.into(),
568 Antichain::from_elem(1001.into()),
569 source_upper.borrow(),
570 )
571 .await;
572
573 tokio::time::advance(PERSIST_READER_LEASE_TIMEOUT_MS / 2 + Duration::from_millis(1)).await;
574 let source_upper = partitioned_frontier([(0, MzOffset::from(5))]);
575 let _ = operator
576 .mint(
577 2000.into(),
578 Antichain::from_elem(2001.into()),
579 source_upper.borrow(),
580 )
581 .await;
582
583 tokio::time::sleep(Duration::from_millis(1)).await;
587
588 let (_operator, _batch) =
591 make_test_operator(binding_shard, Antichain::from_elem(0.into())).await;
592
593 let persist_location = PersistLocation {
595 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
596 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
597 };
598
599 let persist_client = PERSIST_CACHE
600 .open(persist_location)
601 .await
602 .expect("error creating persist client");
603
604 let read_handle = persist_client
605 .open_leased_reader::<SourceData, (), Timestamp, StorageDiff>(
606 binding_shard,
607 Arc::new(PROGRESS_DESC.clone()),
608 Arc::new(UnitSchema),
609 Diagnostics::from_purpose("test_since_hold"),
610 true,
611 )
612 .await
613 .expect("error opening persist shard");
614
615 assert_eq!(
616 Antichain::from_elem(0.into()),
617 read_handle.since().to_owned()
618 );
619 }
620}