1use differential_dataflow::consolidation;
14use differential_dataflow::lattice::Lattice;
15use mz_persist_client::error::UpperMismatch;
16use mz_repr::Diff;
17use mz_storage_client::util::remap_handle::RemapHandle;
18use timely::order::PartialOrder;
19use timely::progress::Timestamp;
20use timely::progress::frontier::{Antichain, AntichainRef, MutableAntichain};
21
22pub mod compat;
23
24#[derive(Debug)]
36pub struct ReclockOperator<
37 FromTime: Timestamp,
38 IntoTime: Timestamp + Lattice,
39 Handle: RemapHandle<FromTime = FromTime, IntoTime = IntoTime>,
40> {
41 upper: Antichain<IntoTime>,
43 source_upper: MutableAntichain<FromTime>,
46
47 remap_handle: Handle,
49}
50
51#[derive(Clone, Debug, PartialEq)]
52pub struct ReclockBatch<FromTime, IntoTime> {
53 pub updates: Vec<(FromTime, IntoTime, Diff)>,
54 pub upper: Antichain<IntoTime>,
55}
56
57impl<FromTime, IntoTime, Handle> ReclockOperator<FromTime, IntoTime, Handle>
58where
59 FromTime: Timestamp,
60 IntoTime: Timestamp + Lattice,
61 Handle: RemapHandle<FromTime = FromTime, IntoTime = IntoTime>,
62{
63 pub async fn new(remap_handle: Handle) -> (Self, ReclockBatch<FromTime, IntoTime>) {
65 let upper = remap_handle.upper().clone();
66
67 let mut operator = Self {
68 upper: Antichain::from_elem(IntoTime::minimum()),
69 source_upper: MutableAntichain::new(),
70 remap_handle,
71 };
72
73 let trace_batch = if upper.elements() != [IntoTime::minimum()] {
75 operator.sync(upper.borrow()).await
76 } else {
77 ReclockBatch {
78 updates: vec![],
79 upper: Antichain::from_elem(IntoTime::minimum()),
80 }
81 };
82
83 (operator, trace_batch)
84 }
85
86 async fn sync(
89 &mut self,
90 target_upper: AntichainRef<'_, IntoTime>,
91 ) -> ReclockBatch<FromTime, IntoTime> {
92 let mut updates: Vec<(FromTime, IntoTime, Diff)> = Vec::new();
93
94 while PartialOrder::less_than(&self.upper.borrow(), &target_upper) {
97 let (mut batch, upper) = self
98 .remap_handle
99 .next()
100 .await
101 .expect("requested data after empty antichain");
102 self.upper = upper;
103 updates.append(&mut batch);
104 }
105
106 self.source_upper.update_iter(
107 updates
108 .iter()
109 .map(|(src_ts, _dest_ts, diff)| (src_ts.clone(), diff.into_inner())),
110 );
111
112 ReclockBatch {
113 updates,
114 upper: self.upper.clone(),
115 }
116 }
117
118 pub async fn mint(
119 &mut self,
120 binding_ts: IntoTime,
121 mut new_into_upper: Antichain<IntoTime>,
122 new_from_upper: AntichainRef<'_, FromTime>,
123 ) -> ReclockBatch<FromTime, IntoTime> {
124 assert!(!new_into_upper.less_equal(&binding_ts));
125 let mut batch = ReclockBatch {
127 updates: vec![],
128 upper: self.upper.clone(),
129 };
130
131 while *self.upper == [IntoTime::minimum()]
132 || (PartialOrder::less_equal(&self.source_upper.frontier(), &new_from_upper)
133 && PartialOrder::less_than(&self.upper, &new_into_upper)
134 && self.upper.less_equal(&binding_ts))
135 {
136 if new_from_upper.is_empty() {
138 new_into_upper = Antichain::new();
139 }
140
141 let binding_ts = if *self.upper == [IntoTime::minimum()] {
146 IntoTime::minimum()
147 } else {
148 binding_ts.clone()
149 };
150
151 let mut updates = vec![];
152 for src_ts in self.source_upper.frontier().iter().cloned() {
153 updates.push((src_ts, binding_ts.clone(), Diff::MINUS_ONE));
154 }
155 for src_ts in new_from_upper.iter().cloned() {
156 updates.push((src_ts, binding_ts.clone(), Diff::ONE));
157 }
158 consolidation::consolidate_updates(&mut updates);
159
160 let new_batch = match self.append_batch(updates, &new_into_upper).await {
161 Ok(trace_batch) => trace_batch,
162 Err(UpperMismatch { current, .. }) => self.sync(current.borrow()).await,
163 };
164 batch.updates.extend(new_batch.updates);
165 batch.upper = new_batch.upper;
166 }
167
168 batch
169 }
170
171 async fn append_batch(
179 &mut self,
180 updates: Vec<(FromTime, IntoTime, Diff)>,
181 new_upper: &Antichain<IntoTime>,
182 ) -> Result<ReclockBatch<FromTime, IntoTime>, UpperMismatch<IntoTime>> {
183 match self
184 .remap_handle
185 .compare_and_append(updates, self.upper.clone(), new_upper.clone())
186 .await
187 {
188 Ok(()) => Ok(self.sync(new_upper.borrow()).await),
191 Err(mismatch) => Err(mismatch),
192 }
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use std::cell::RefCell;
199 use std::rc::Rc;
200 use std::str::FromStr;
201 use std::sync::Arc;
202 use std::sync::LazyLock;
203 use std::time::Duration;
204
205 use mz_build_info::DUMMY_BUILD_INFO;
206 use mz_ore::metrics::MetricsRegistry;
207 use mz_ore::now::SYSTEM_TIME;
208 use mz_ore::url::SensitiveUrl;
209 use mz_persist_client::cache::PersistClientCache;
210 use mz_persist_client::cfg::PersistConfig;
211 use mz_persist_client::rpc::PubSubClientConnection;
212 use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
213 use mz_persist_types::codec_impls::UnitSchema;
214 use mz_repr::{GlobalId, RelationDesc, ScalarType, Timestamp};
215 use mz_storage_client::util::remap_handle::RemapHandle;
216 use mz_storage_types::StorageDiff;
217 use mz_storage_types::controller::CollectionMetadata;
218 use mz_storage_types::sources::kafka::{self, RangeBound as RB};
219 use mz_storage_types::sources::{MzOffset, SourceData};
220 use mz_timely_util::order::Partitioned;
221 use timely::progress::Timestamp as _;
222 use tokio::sync::watch;
223
224 use super::*;
225
226 static PERSIST_READER_LEASE_TIMEOUT_MS: Duration = Duration::from_secs(60 * 15);
228
229 static PERSIST_CACHE: LazyLock<Arc<PersistClientCache>> = LazyLock::new(|| {
230 let persistcfg = PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone());
231 persistcfg.set_reader_lease_duration(PERSIST_READER_LEASE_TIMEOUT_MS);
232 Arc::new(PersistClientCache::new(
233 persistcfg,
234 &MetricsRegistry::new(),
235 |_, _| PubSubClientConnection::noop(),
236 ))
237 });
238
239 static PROGRESS_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
240 RelationDesc::builder()
241 .with_column(
242 "partition",
243 ScalarType::Range {
244 element_type: Box::new(ScalarType::Numeric { max_scale: None }),
245 }
246 .nullable(false),
247 )
248 .with_column("offset", ScalarType::UInt64.nullable(true))
249 .finish()
250 });
251
252 async fn make_test_operator(
253 shard: ShardId,
254 as_of: Antichain<Timestamp>,
255 ) -> (
256 ReclockOperator<
257 kafka::KafkaTimestamp,
258 Timestamp,
259 impl RemapHandle<FromTime = kafka::KafkaTimestamp, IntoTime = Timestamp>,
260 >,
261 ReclockBatch<kafka::KafkaTimestamp, Timestamp>,
262 ) {
263 let metadata = CollectionMetadata {
264 persist_location: PersistLocation {
265 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
266 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
267 },
268 remap_shard: Some(shard),
269 data_shard: ShardId::new(),
270 relation_desc: RelationDesc::empty(),
271 txns_shard: None,
272 };
273
274 let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
275
276 let (_read_only_tx, read_only_rx) = watch::channel(false);
278 let remap_handle = crate::source::reclock::compat::PersistHandle::new(
279 Arc::clone(&*PERSIST_CACHE),
280 read_only_rx,
281 metadata,
282 as_of.clone(),
283 write_frontier,
284 GlobalId::Explain,
285 "unittest",
286 0,
287 1,
288 PROGRESS_DESC.clone(),
289 GlobalId::Explain,
290 )
291 .await
292 .unwrap();
293
294 let (mut operator, mut initial_batch) = ReclockOperator::new(remap_handle).await;
295
296 if *initial_batch.upper == [Timestamp::minimum()] {
298 initial_batch = operator
301 .mint(
302 0.into(),
303 Antichain::from_elem(1.into()),
304 Antichain::from_elem(Partitioned::minimum()).borrow(),
305 )
306 .await;
307 }
308
309 (operator, initial_batch)
310 }
311
312 fn partitioned_frontier<I>(items: I) -> Antichain<kafka::KafkaTimestamp>
316 where
317 I: IntoIterator<Item = (i32, MzOffset)>,
318 {
319 let mut frontier = Antichain::new();
320 let mut prev = RB::NegInfinity;
321 for (pid, offset) in items {
322 assert!(prev < RB::before(pid));
323 let gap = Partitioned::new_range(prev, RB::before(pid), MzOffset::from(0));
324 frontier.extend([gap, Partitioned::new_singleton(RB::exact(pid), offset)]);
325 prev = RB::after(pid);
326 }
327 frontier.insert(Partitioned::new_range(
328 prev,
329 RB::PosInfinity,
330 MzOffset::from(0),
331 ));
332 frontier
333 }
334
335 #[mz_ore::test(tokio::test)]
336 #[cfg_attr(miri, ignore)] async fn test_basic_usage() {
338 let (mut operator, _) =
339 make_test_operator(ShardId::new(), Antichain::from_elem(0.into())).await;
340
341 let source_upper = partitioned_frontier([(0, MzOffset::from(4))]);
343 let mut batch = operator
344 .mint(
345 1000.into(),
346 Antichain::from_elem(1001.into()),
347 source_upper.borrow(),
348 )
349 .await;
350 let mut expected_batch: ReclockBatch<_, Timestamp> = ReclockBatch {
351 updates: vec![
352 (
353 Partitioned::new_range(RB::NegInfinity, RB::before(0), MzOffset::from(0)),
354 1000.into(),
355 Diff::ONE,
356 ),
357 (
358 Partitioned::new_range(RB::after(0), RB::PosInfinity, MzOffset::from(0)),
359 1000.into(),
360 Diff::ONE,
361 ),
362 (
363 Partitioned::new_range(RB::NegInfinity, RB::PosInfinity, MzOffset::from(0)),
364 1000.into(),
365 Diff::MINUS_ONE,
366 ),
367 (
368 Partitioned::new_singleton(RB::exact(0), MzOffset::from(4)),
369 1000.into(),
370 Diff::ONE,
371 ),
372 ],
373 upper: Antichain::from_elem(Timestamp::from(1001)),
374 };
375 batch.updates.sort();
376 expected_batch.updates.sort();
377 assert_eq!(batch, expected_batch);
378 }
379
380 #[mz_ore::test(tokio::test)]
381 #[cfg_attr(miri, ignore)] async fn test_compaction() {
383 let persist_location = PersistLocation {
384 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
385 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
386 };
387
388 let remap_shard = ShardId::new();
389
390 let persist_client = PERSIST_CACHE
391 .open(persist_location)
392 .await
393 .expect("error creating persist client");
394
395 let mut remap_read_handle = persist_client
396 .open_critical_since::<SourceData, (), Timestamp, StorageDiff, u64>(
397 remap_shard,
398 PersistClient::CONTROLLER_CRITICAL_SINCE,
399 Diagnostics::from_purpose("test_since_hold"),
400 )
401 .await
402 .expect("error opening persist shard");
403
404 let (mut operator, _batch) =
405 make_test_operator(remap_shard, Antichain::from_elem(0.into())).await;
406
407 let source_upper = partitioned_frontier([(0, MzOffset::from(3))]);
409 operator
410 .mint(
411 1000.into(),
412 Antichain::from_elem(1001.into()),
413 source_upper.borrow(),
414 )
415 .await;
416
417 let source_upper = partitioned_frontier([(0, MzOffset::from(5))]);
418 operator
419 .mint(
420 2000.into(),
421 Antichain::from_elem(2001.into()),
422 source_upper.borrow(),
423 )
424 .await;
425
426 remap_read_handle
428 .compare_and_downgrade_since(&0, (&0, &Antichain::from_elem(1000.into())))
429 .await
430 .unwrap();
431
432 let (_operator, mut initial_batch) =
434 make_test_operator(remap_shard, Antichain::from_elem(1000.into())).await;
435
436 let mut expected_batch: ReclockBatch<_, Timestamp> = ReclockBatch {
437 updates: vec![
438 (
439 Partitioned::new_range(RB::NegInfinity, RB::before(0), MzOffset::from(0)),
440 1000.into(),
441 Diff::ONE,
442 ),
443 (
444 Partitioned::new_range(RB::after(0), RB::PosInfinity, MzOffset::from(0)),
445 1000.into(),
446 Diff::ONE,
447 ),
448 (
449 Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
450 1000.into(),
451 Diff::ONE,
452 ),
453 (
454 Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
455 2000.into(),
456 Diff::MINUS_ONE,
457 ),
458 (
459 Partitioned::new_singleton(RB::exact(0), MzOffset::from(5)),
460 2000.into(),
461 Diff::ONE,
462 ),
463 ],
464 upper: Antichain::from_elem(Timestamp::from(2001)),
465 };
466 expected_batch.updates.sort();
467 initial_batch.updates.sort();
468 assert_eq!(initial_batch, expected_batch);
469 }
470
471 #[mz_ore::test(tokio::test)]
472 #[cfg_attr(miri, ignore)] async fn test_concurrency() {
474 let shared_shard = ShardId::new();
476 let (mut op_a, _) = make_test_operator(shared_shard, Antichain::from_elem(0.into())).await;
477 let (mut op_b, _) = make_test_operator(shared_shard, Antichain::from_elem(0.into())).await;
478
479 let source_upper = partitioned_frontier([(0, MzOffset::from(3))]);
481 let mut batch = op_a
482 .mint(
483 1000.into(),
484 Antichain::from_elem(1001.into()),
485 source_upper.borrow(),
486 )
487 .await;
488 let mut expected_batch: ReclockBatch<_, Timestamp> = ReclockBatch {
489 updates: vec![
490 (
491 Partitioned::new_range(RB::NegInfinity, RB::before(0), MzOffset::from(0)),
492 1000.into(),
493 Diff::ONE,
494 ),
495 (
496 Partitioned::new_range(RB::after(0), RB::PosInfinity, MzOffset::from(0)),
497 1000.into(),
498 Diff::ONE,
499 ),
500 (
501 Partitioned::new_range(RB::NegInfinity, RB::PosInfinity, MzOffset::from(0)),
502 1000.into(),
503 Diff::MINUS_ONE,
504 ),
505 (
506 Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
507 1000.into(),
508 Diff::ONE,
509 ),
510 ],
511 upper: Antichain::from_elem(Timestamp::from(1001)),
512 };
513 batch.updates.sort();
514 expected_batch.updates.sort();
515 assert_eq!(batch, expected_batch);
516
517 let source_upper = partitioned_frontier([(0, MzOffset::from(5))]);
520 let mut batch = op_b
521 .mint(
522 11000.into(),
523 Antichain::from_elem(11001.into()),
524 source_upper.borrow(),
525 )
526 .await;
527 expected_batch.updates.extend([
528 (
529 Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
530 11000.into(),
531 Diff::MINUS_ONE,
532 ),
533 (
534 Partitioned::new_singleton(RB::exact(0), MzOffset::from(5)),
535 11000.into(),
536 Diff::ONE,
537 ),
538 ]);
539 expected_batch.upper = Antichain::from_elem(Timestamp::from(11001));
540 batch.updates.sort();
541 expected_batch.updates.sort();
542 assert_eq!(batch, expected_batch);
543 }
544
545 #[mz_ore::test(tokio::test(start_paused = true))]
548 #[cfg_attr(miri, ignore)] async fn test_since_hold() {
550 let binding_shard = ShardId::new();
551
552 let (mut operator, _) =
553 make_test_operator(binding_shard, Antichain::from_elem(0.into())).await;
554
555 tokio::time::advance(PERSIST_READER_LEASE_TIMEOUT_MS / 2 + Duration::from_millis(1)).await;
565 let source_upper = partitioned_frontier([(0, MzOffset::from(3))]);
566 let _ = operator
567 .mint(
568 1000.into(),
569 Antichain::from_elem(1001.into()),
570 source_upper.borrow(),
571 )
572 .await;
573
574 tokio::time::advance(PERSIST_READER_LEASE_TIMEOUT_MS / 2 + Duration::from_millis(1)).await;
575 let source_upper = partitioned_frontier([(0, MzOffset::from(5))]);
576 let _ = operator
577 .mint(
578 2000.into(),
579 Antichain::from_elem(2001.into()),
580 source_upper.borrow(),
581 )
582 .await;
583
584 tokio::time::sleep(Duration::from_millis(1)).await;
588
589 let (_operator, _batch) =
592 make_test_operator(binding_shard, Antichain::from_elem(0.into())).await;
593
594 let persist_location = PersistLocation {
596 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
597 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
598 };
599
600 let persist_client = PERSIST_CACHE
601 .open(persist_location)
602 .await
603 .expect("error creating persist client");
604
605 let read_handle = persist_client
606 .open_leased_reader::<SourceData, (), Timestamp, StorageDiff>(
607 binding_shard,
608 Arc::new(PROGRESS_DESC.clone()),
609 Arc::new(UnitSchema),
610 Diagnostics::from_purpose("test_since_hold"),
611 true,
612 )
613 .await
614 .expect("error opening persist shard");
615
616 assert_eq!(
617 Antichain::from_elem(0.into()),
618 read_handle.since().to_owned()
619 );
620 }
621}