1use differential_dataflow::consolidation;
14use differential_dataflow::lattice::Lattice;
15use mz_persist_client::error::UpperMismatch;
16use mz_repr::Diff;
17use mz_storage_client::util::remap_handle::RemapHandle;
18use timely::order::PartialOrder;
19use timely::progress::Timestamp;
20use timely::progress::frontier::{Antichain, AntichainRef, MutableAntichain};
21
22pub mod compat;
23
24#[derive(Debug)]
36pub struct ReclockOperator<
37 FromTime: Timestamp,
38 IntoTime: Timestamp + Lattice,
39 Handle: RemapHandle<FromTime = FromTime, IntoTime = IntoTime>,
40> {
41 upper: Antichain<IntoTime>,
43 source_upper: MutableAntichain<FromTime>,
46
47 remap_handle: Handle,
49}
50
51#[derive(Clone, Debug, PartialEq)]
52pub struct ReclockBatch<FromTime, IntoTime> {
53 pub updates: Vec<(FromTime, IntoTime, Diff)>,
54 pub upper: Antichain<IntoTime>,
55}
56
57impl<FromTime, IntoTime, Handle> ReclockOperator<FromTime, IntoTime, Handle>
58where
59 FromTime: Timestamp,
60 IntoTime: Timestamp + Lattice,
61 Handle: RemapHandle<FromTime = FromTime, IntoTime = IntoTime>,
62{
63 pub async fn new(remap_handle: Handle) -> (Self, ReclockBatch<FromTime, IntoTime>) {
65 let upper = remap_handle.upper().clone();
66
67 let mut operator = Self {
68 upper: Antichain::from_elem(IntoTime::minimum()),
69 source_upper: MutableAntichain::new(),
70 remap_handle,
71 };
72
73 let trace_batch = if upper.elements() != [IntoTime::minimum()] {
75 operator.sync(upper.borrow()).await
76 } else {
77 ReclockBatch {
78 updates: vec![],
79 upper: Antichain::from_elem(IntoTime::minimum()),
80 }
81 };
82
83 (operator, trace_batch)
84 }
85
86 async fn sync(
89 &mut self,
90 target_upper: AntichainRef<'_, IntoTime>,
91 ) -> ReclockBatch<FromTime, IntoTime> {
92 let mut updates: Vec<(FromTime, IntoTime, Diff)> = Vec::new();
93
94 while PartialOrder::less_than(&self.upper.borrow(), &target_upper) {
97 let (mut batch, upper) = self
98 .remap_handle
99 .next()
100 .await
101 .expect("requested data after empty antichain");
102 self.upper = upper;
103 updates.append(&mut batch);
104 }
105
106 self.source_upper.update_iter(
107 updates
108 .iter()
109 .map(|(src_ts, _dest_ts, diff)| (src_ts.clone(), diff.into_inner())),
110 );
111
112 ReclockBatch {
113 updates,
114 upper: self.upper.clone(),
115 }
116 }
117
118 pub async fn mint(
119 &mut self,
120 binding_ts: IntoTime,
121 mut new_into_upper: Antichain<IntoTime>,
122 new_from_upper: AntichainRef<'_, FromTime>,
123 ) -> ReclockBatch<FromTime, IntoTime> {
124 assert!(!new_into_upper.less_equal(&binding_ts));
125 let mut batch = ReclockBatch {
127 updates: vec![],
128 upper: self.upper.clone(),
129 };
130
131 while *self.upper == [IntoTime::minimum()]
132 || (PartialOrder::less_equal(&self.source_upper.frontier(), &new_from_upper)
133 && PartialOrder::less_than(&self.upper, &new_into_upper)
134 && self.upper.less_equal(&binding_ts))
135 {
136 if new_from_upper.is_empty() {
138 new_into_upper = Antichain::new();
139 }
140
141 let binding_ts = if *self.upper == [IntoTime::minimum()] {
146 IntoTime::minimum()
147 } else {
148 binding_ts.clone()
149 };
150
151 let mut updates = vec![];
152 for src_ts in self.source_upper.frontier().iter().cloned() {
153 updates.push((src_ts, binding_ts.clone(), Diff::MINUS_ONE));
154 }
155 for src_ts in new_from_upper.iter().cloned() {
156 updates.push((src_ts, binding_ts.clone(), Diff::ONE));
157 }
158 consolidation::consolidate_updates(&mut updates);
159
160 let new_batch = match self.append_batch(updates, &new_into_upper).await {
161 Ok(trace_batch) => trace_batch,
162 Err(UpperMismatch { current, .. }) => self.sync(current.borrow()).await,
163 };
164 batch.updates.extend(new_batch.updates);
165 batch.upper = new_batch.upper;
166 }
167
168 batch
169 }
170
171 async fn append_batch(
179 &mut self,
180 updates: Vec<(FromTime, IntoTime, Diff)>,
181 new_upper: &Antichain<IntoTime>,
182 ) -> Result<ReclockBatch<FromTime, IntoTime>, UpperMismatch<IntoTime>> {
183 match self
184 .remap_handle
185 .compare_and_append(updates, self.upper.clone(), new_upper.clone())
186 .await
187 {
188 Ok(()) => Ok(self.sync(new_upper.borrow()).await),
191 Err(mismatch) => Err(mismatch),
192 }
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use std::cell::RefCell;
199 use std::rc::Rc;
200 use std::str::FromStr;
201 use std::sync::Arc;
202 use std::sync::LazyLock;
203 use std::time::Duration;
204
205 use super::*;
206 use mz_build_info::DUMMY_BUILD_INFO;
207 use mz_ore::metrics::MetricsRegistry;
208 use mz_ore::now::SYSTEM_TIME;
209 use mz_ore::url::SensitiveUrl;
210 use mz_persist_client::cache::PersistClientCache;
211 use mz_persist_client::cfg::PersistConfig;
212 use mz_persist_client::critical::Opaque;
213 use mz_persist_client::rpc::PubSubClientConnection;
214 use mz_persist_client::{Diagnostics, PersistClient, PersistLocation, ShardId};
215 use mz_persist_types::codec_impls::UnitSchema;
216 use mz_repr::{GlobalId, RelationDesc, SqlScalarType, Timestamp};
217 use mz_storage_client::util::remap_handle::RemapHandle;
218 use mz_storage_types::StorageDiff;
219 use mz_storage_types::controller::CollectionMetadata;
220 use mz_storage_types::sources::kafka::{self, RangeBound as RB};
221 use mz_storage_types::sources::{MzOffset, SourceData};
222 use mz_timely_util::order::Partitioned;
223 use timely::progress::Timestamp as _;
224 use tokio::sync::watch;
225
226 static PERSIST_READER_LEASE_TIMEOUT_MS: Duration = Duration::from_secs(60 * 15);
228
229 static PERSIST_CACHE: LazyLock<Arc<PersistClientCache>> = LazyLock::new(|| {
230 let persistcfg = PersistConfig::new_default_configs(&DUMMY_BUILD_INFO, SYSTEM_TIME.clone());
231 persistcfg.set_reader_lease_duration(PERSIST_READER_LEASE_TIMEOUT_MS);
232 Arc::new(PersistClientCache::new(
233 persistcfg,
234 &MetricsRegistry::new(),
235 |_, _| PubSubClientConnection::noop(),
236 ))
237 });
238
239 static PROGRESS_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
240 RelationDesc::builder()
241 .with_column(
242 "partition",
243 SqlScalarType::Range {
244 element_type: Box::new(SqlScalarType::Numeric { max_scale: None }),
245 }
246 .nullable(false),
247 )
248 .with_column("offset", SqlScalarType::UInt64.nullable(true))
249 .finish()
250 });
251
252 async fn make_test_operator(
253 shard: ShardId,
254 as_of: Antichain<Timestamp>,
255 ) -> (
256 ReclockOperator<
257 kafka::KafkaTimestamp,
258 Timestamp,
259 impl RemapHandle<FromTime = kafka::KafkaTimestamp, IntoTime = Timestamp>,
260 >,
261 ReclockBatch<kafka::KafkaTimestamp, Timestamp>,
262 ) {
263 let metadata = CollectionMetadata {
264 persist_location: PersistLocation {
265 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
266 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
267 },
268 data_shard: shard,
269 relation_desc: RelationDesc::empty(),
270 txns_shard: None,
271 };
272
273 let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
274
275 let (_read_only_tx, read_only_rx) = watch::channel(false);
277 let remap_handle = crate::source::reclock::compat::PersistHandle::new(
278 Arc::clone(&*PERSIST_CACHE),
279 read_only_rx,
280 metadata,
281 as_of.clone(),
282 write_frontier,
283 GlobalId::Explain,
284 "unittest",
285 0,
286 1,
287 PROGRESS_DESC.clone(),
288 GlobalId::Explain,
289 )
290 .await
291 .unwrap();
292
293 let (mut operator, mut initial_batch) = ReclockOperator::new(remap_handle).await;
294
295 if *initial_batch.upper == [Timestamp::minimum()] {
297 initial_batch = operator
300 .mint(
301 0.into(),
302 Antichain::from_elem(1.into()),
303 Antichain::from_elem(Partitioned::minimum()).borrow(),
304 )
305 .await;
306 }
307
308 (operator, initial_batch)
309 }
310
311 fn partitioned_frontier<I>(items: I) -> Antichain<kafka::KafkaTimestamp>
315 where
316 I: IntoIterator<Item = (i32, MzOffset)>,
317 {
318 let mut frontier = Antichain::new();
319 let mut prev = RB::NegInfinity;
320 for (pid, offset) in items {
321 assert!(prev < RB::before(pid));
322 let gap = Partitioned::new_range(prev, RB::before(pid), MzOffset::from(0));
323 frontier.extend([gap, Partitioned::new_singleton(RB::exact(pid), offset)]);
324 prev = RB::after(pid);
325 }
326 frontier.insert(Partitioned::new_range(
327 prev,
328 RB::PosInfinity,
329 MzOffset::from(0),
330 ));
331 frontier
332 }
333
334 #[mz_ore::test(tokio::test)]
335 #[cfg_attr(miri, ignore)] async fn test_basic_usage() {
337 let (mut operator, _) =
338 make_test_operator(ShardId::new(), Antichain::from_elem(0.into())).await;
339
340 let source_upper = partitioned_frontier([(0, MzOffset::from(4))]);
342 let mut batch = operator
343 .mint(
344 1000.into(),
345 Antichain::from_elem(1001.into()),
346 source_upper.borrow(),
347 )
348 .await;
349 let mut expected_batch: ReclockBatch<_, Timestamp> = ReclockBatch {
350 updates: vec![
351 (
352 Partitioned::new_range(RB::NegInfinity, RB::before(0), MzOffset::from(0)),
353 1000.into(),
354 Diff::ONE,
355 ),
356 (
357 Partitioned::new_range(RB::after(0), RB::PosInfinity, MzOffset::from(0)),
358 1000.into(),
359 Diff::ONE,
360 ),
361 (
362 Partitioned::new_range(RB::NegInfinity, RB::PosInfinity, MzOffset::from(0)),
363 1000.into(),
364 Diff::MINUS_ONE,
365 ),
366 (
367 Partitioned::new_singleton(RB::exact(0), MzOffset::from(4)),
368 1000.into(),
369 Diff::ONE,
370 ),
371 ],
372 upper: Antichain::from_elem(Timestamp::from(1001)),
373 };
374 batch.updates.sort();
375 expected_batch.updates.sort();
376 assert_eq!(batch, expected_batch);
377 }
378
379 #[mz_ore::test(tokio::test)]
380 #[cfg_attr(miri, ignore)] async fn test_compaction() {
382 let persist_location = PersistLocation {
383 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
384 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
385 };
386
387 let remap_shard = ShardId::new();
388
389 let persist_client = PERSIST_CACHE
390 .open(persist_location)
391 .await
392 .expect("error creating persist client");
393
394 let mut remap_read_handle = persist_client
395 .open_critical_since::<SourceData, (), Timestamp, StorageDiff>(
396 remap_shard,
397 PersistClient::CONTROLLER_CRITICAL_SINCE,
398 Opaque::encode(&0u64),
399 Diagnostics::from_purpose("test_since_hold"),
400 )
401 .await
402 .expect("error opening persist shard");
403
404 let (mut operator, _batch) =
405 make_test_operator(remap_shard, Antichain::from_elem(0.into())).await;
406
407 let source_upper = partitioned_frontier([(0, MzOffset::from(3))]);
409 operator
410 .mint(
411 1000.into(),
412 Antichain::from_elem(1001.into()),
413 source_upper.borrow(),
414 )
415 .await;
416
417 let source_upper = partitioned_frontier([(0, MzOffset::from(5))]);
418 operator
419 .mint(
420 2000.into(),
421 Antichain::from_elem(2001.into()),
422 source_upper.borrow(),
423 )
424 .await;
425
426 remap_read_handle
428 .compare_and_downgrade_since(
429 &Opaque::encode(&0u64),
430 (&Opaque::encode(&0u64), &Antichain::from_elem(1000.into())),
431 )
432 .await
433 .unwrap();
434
435 let (_operator, mut initial_batch) =
437 make_test_operator(remap_shard, Antichain::from_elem(1000.into())).await;
438
439 let mut expected_batch: ReclockBatch<_, Timestamp> = ReclockBatch {
440 updates: vec![
441 (
442 Partitioned::new_range(RB::NegInfinity, RB::before(0), MzOffset::from(0)),
443 1000.into(),
444 Diff::ONE,
445 ),
446 (
447 Partitioned::new_range(RB::after(0), RB::PosInfinity, MzOffset::from(0)),
448 1000.into(),
449 Diff::ONE,
450 ),
451 (
452 Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
453 1000.into(),
454 Diff::ONE,
455 ),
456 (
457 Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
458 2000.into(),
459 Diff::MINUS_ONE,
460 ),
461 (
462 Partitioned::new_singleton(RB::exact(0), MzOffset::from(5)),
463 2000.into(),
464 Diff::ONE,
465 ),
466 ],
467 upper: Antichain::from_elem(Timestamp::from(2001)),
468 };
469 expected_batch.updates.sort();
470 initial_batch.updates.sort();
471 assert_eq!(initial_batch, expected_batch);
472 }
473
474 #[mz_ore::test(tokio::test)]
475 #[cfg_attr(miri, ignore)] async fn test_concurrency() {
477 let shared_shard = ShardId::new();
479 let (mut op_a, _) = make_test_operator(shared_shard, Antichain::from_elem(0.into())).await;
480 let (mut op_b, _) = make_test_operator(shared_shard, Antichain::from_elem(0.into())).await;
481
482 let source_upper = partitioned_frontier([(0, MzOffset::from(3))]);
484 let mut batch = op_a
485 .mint(
486 1000.into(),
487 Antichain::from_elem(1001.into()),
488 source_upper.borrow(),
489 )
490 .await;
491 let mut expected_batch: ReclockBatch<_, Timestamp> = ReclockBatch {
492 updates: vec![
493 (
494 Partitioned::new_range(RB::NegInfinity, RB::before(0), MzOffset::from(0)),
495 1000.into(),
496 Diff::ONE,
497 ),
498 (
499 Partitioned::new_range(RB::after(0), RB::PosInfinity, MzOffset::from(0)),
500 1000.into(),
501 Diff::ONE,
502 ),
503 (
504 Partitioned::new_range(RB::NegInfinity, RB::PosInfinity, MzOffset::from(0)),
505 1000.into(),
506 Diff::MINUS_ONE,
507 ),
508 (
509 Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
510 1000.into(),
511 Diff::ONE,
512 ),
513 ],
514 upper: Antichain::from_elem(Timestamp::from(1001)),
515 };
516 batch.updates.sort();
517 expected_batch.updates.sort();
518 assert_eq!(batch, expected_batch);
519
520 let source_upper = partitioned_frontier([(0, MzOffset::from(5))]);
523 let mut batch = op_b
524 .mint(
525 11000.into(),
526 Antichain::from_elem(11001.into()),
527 source_upper.borrow(),
528 )
529 .await;
530 expected_batch.updates.extend([
531 (
532 Partitioned::new_singleton(RB::exact(0), MzOffset::from(3)),
533 11000.into(),
534 Diff::MINUS_ONE,
535 ),
536 (
537 Partitioned::new_singleton(RB::exact(0), MzOffset::from(5)),
538 11000.into(),
539 Diff::ONE,
540 ),
541 ]);
542 expected_batch.upper = Antichain::from_elem(Timestamp::from(11001));
543 batch.updates.sort();
544 expected_batch.updates.sort();
545 assert_eq!(batch, expected_batch);
546 }
547
548 #[mz_ore::test(tokio::test(start_paused = true))]
551 #[cfg_attr(miri, ignore)]
552 #[ignore]
557 async fn test_since_hold() {
558 let binding_shard = ShardId::new();
559
560 let (mut operator, _) =
561 make_test_operator(binding_shard, Antichain::from_elem(0.into())).await;
562
563 tokio::time::advance(PERSIST_READER_LEASE_TIMEOUT_MS / 2 + Duration::from_millis(1)).await;
573 let source_upper = partitioned_frontier([(0, MzOffset::from(3))]);
574 let _ = operator
575 .mint(
576 1000.into(),
577 Antichain::from_elem(1001.into()),
578 source_upper.borrow(),
579 )
580 .await;
581
582 tokio::time::advance(PERSIST_READER_LEASE_TIMEOUT_MS / 2 + Duration::from_millis(1)).await;
583 let source_upper = partitioned_frontier([(0, MzOffset::from(5))]);
584 let _ = operator
585 .mint(
586 2000.into(),
587 Antichain::from_elem(2001.into()),
588 source_upper.borrow(),
589 )
590 .await;
591
592 tokio::time::sleep(Duration::from_millis(1)).await;
596
597 let (_operator, _batch) =
600 make_test_operator(binding_shard, Antichain::from_elem(0.into())).await;
601
602 let persist_location = PersistLocation {
604 blob_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
605 consensus_uri: SensitiveUrl::from_str("mem://").expect("invalid URL"),
606 };
607
608 let persist_client = PERSIST_CACHE
609 .open(persist_location)
610 .await
611 .expect("error creating persist client");
612
613 let read_handle = persist_client
614 .open_leased_reader::<SourceData, (), Timestamp, StorageDiff>(
615 binding_shard,
616 Arc::new(PROGRESS_DESC.clone()),
617 Arc::new(UnitSchema),
618 Diagnostics::from_purpose("test_since_hold"),
619 true,
620 )
621 .await
622 .expect("error opening persist shard");
623
624 assert_eq!(
625 Antichain::from_elem(0.into()),
626 read_handle.since().to_owned()
627 );
628 }
629}