1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Timely operators for the crate
use std::any::Any;
use std::fmt::Debug;
use std::future::Future;
use std::sync::mpsc::TryRecvError;
use std::sync::{mpsc, Arc};
use std::time::Duration;
use differential_dataflow::difference::Semigroup;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::Hashable;
use futures::StreamExt;
use mz_dyncfg::{Config, ConfigSet, ConfigUpdates};
use mz_ore::cast::CastFrom;
use mz_ore::task::JoinHandleExt;
use mz_persist_client::cfg::{RetryParameters, USE_GLOBAL_TXN_CACHE_SOURCE};
use mz_persist_client::operators::shard_source::{shard_source, SnapshotMode};
use mz_persist_client::project::ProjectionPushdown;
use mz_persist_client::{Diagnostics, PersistClient, ShardId};
use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
use mz_persist_types::txn::TxnsCodec;
use mz_persist_types::{Codec, Codec64, StepForward};
use mz_timely_util::builder_async::{
AsyncInputHandle, Event as AsyncEvent, InputConnection,
OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
};
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::capture::Event;
use timely::dataflow::operators::{Broadcast, Capture, Leave, Map, Probe};
use timely::dataflow::{ProbeHandle, Scope, Stream};
use timely::order::TotalOrder;
use timely::progress::{Antichain, Timestamp};
use timely::worker::Worker;
use timely::{Data, PartialOrder, WorkerConfig};
use tracing::debug;
use crate::txn_cache::TxnsCache;
use crate::txn_read::{DataListenNext, DataRemapEntry, TxnsRead};
use crate::TxnsCodecDefault;
/// An operator for translating physical data shard frontiers into logical ones.
///
/// A data shard in the txns set logically advances its upper each time a txn is
/// committed, but the upper is not physically advanced unless that data shard
/// was involved in the txn. This means that a shard_source (or any read)
/// pointed at a data shard would appear to stall at the time of the most recent
/// write. We fix this for shard_source by flowing its output through a new
/// `txns_progress` dataflow operator, which ensures that the
/// frontier/capability is advanced as the txns shard progresses, as long as the
/// shard_source is up to date with the latest committed write to that data
/// shard.
///
/// Example:
///
/// - A data shard has most recently been written to at 3.
/// - The txns shard's upper is at 6.
/// - We render a dataflow containing a shard_source with an as_of of 5.
/// - A txn NOT involving the data shard is committed at 7.
/// - A txn involving the data shard is committed at 9.
///
/// How it works:
///
/// - The shard_source operator is rendered. Its single output is hooked up as a
/// _disconnected_ input to txns_progress. The txns_progress single output is
/// a stream of the same type, which is used by downstream operators. This
/// txns_progress operator is targeted at one data_shard; rendering a
/// shard_source for a second data shard requires a second txns_progress
/// operator.
/// - The shard_source operator emits data through 3 and advances the frontier.
/// - The txns_progress operator passes through these writes and frontier
/// advancements unchanged. (Recall that it's always correct to read a data
/// shard "normally", it just might stall.) Because the txns_progress operator
/// knows there are no writes in `[3,5]`, it then downgrades its own
/// capability past 5 (to 6). Because the input is disconnected, this means
/// the overall frontier of the output is downgraded to 6.
/// - The txns_progress operator learns about the write at 7 (the upper is now
/// 8). Because it knows that the data shard was not involved in this, it's
/// free to downgrade its capability to 8.
/// - The txns_progress operator learns about the write at 9 (the upper is now
/// 10). It knows that the data shard _WAS_ involved in this, so it forwards
/// on data from its input until the input has progressed to 10, at which
/// point it can itself downgrade to 10.
pub fn txns_progress<K, V, T, D, P, C, F, G>(
passthrough: Stream<G, P>,
name: &str,
ctx: &TxnsContext,
worker_dyncfgs: &ConfigSet,
client_fn: impl Fn() -> F,
txns_id: ShardId,
data_id: ShardId,
as_of: T,
until: Antichain<T>,
data_key_schema: Arc<K::Schema>,
data_val_schema: Arc<V::Schema>,
) -> (Stream<G, P>, Vec<PressOnDropButton>)
where
K: Debug + Codec + Send + Sync,
V: Debug + Codec + Send + Sync,
T: Timestamp + Lattice + TotalOrder + StepForward + Codec64,
D: Debug + Data + Semigroup + Ord + Codec64 + Send + Sync,
P: Debug + Data,
C: TxnsCodec + 'static,
F: Future<Output = PersistClient> + Send + 'static,
G: Scope<Timestamp = T>,
{
let unique_id = (name, passthrough.scope().addr()).hashed();
let (remap, source_button) = if USE_GLOBAL_TXN_CACHE_SOURCE.get(worker_dyncfgs) {
txns_progress_source_global::<K, V, T, D, P, C, G>(
passthrough.scope(),
name,
ctx.clone(),
client_fn(),
txns_id,
data_id,
as_of,
data_key_schema,
data_val_schema,
unique_id,
)
} else {
txns_progress_source_local::<K, V, T, D, P, C, G>(
passthrough.scope(),
name,
client_fn(),
txns_id,
data_id,
as_of,
data_key_schema,
data_val_schema,
unique_id,
)
};
// Each of the `txns_frontiers` workers wants the full copy of the remap
// information.
let remap = remap.broadcast();
let (passthrough, frontiers_button) = txns_progress_frontiers::<K, V, T, D, P, C, G>(
remap,
passthrough,
name,
data_id,
until,
unique_id,
);
(passthrough, vec![source_button, frontiers_button])
}
/// An alternative implementation of [`txns_progress_source_global`] that opens
/// a new [`TxnsCache`] local to the operator.
fn txns_progress_source_local<K, V, T, D, P, C, G>(
scope: G,
name: &str,
client: impl Future<Output = PersistClient> + 'static,
txns_id: ShardId,
data_id: ShardId,
as_of: T,
data_key_schema: Arc<K::Schema>,
data_val_schema: Arc<V::Schema>,
unique_id: u64,
) -> (Stream<G, DataRemapEntry<T>>, PressOnDropButton)
where
K: Debug + Codec + Send + Sync,
V: Debug + Codec + Send + Sync,
T: Timestamp + Lattice + TotalOrder + StepForward + Codec64,
D: Debug + Data + Semigroup + Ord + Codec64 + Send + Sync,
P: Debug + Data,
C: TxnsCodec + 'static,
G: Scope<Timestamp = T>,
{
let worker_idx = scope.index();
let chosen_worker = usize::cast_from(name.hashed()) % scope.peers();
let name = format!("txns_progress_source({})", name);
let mut builder = AsyncOperatorBuilder::new(name.clone(), scope);
let name = format!("{} [{}] {:.9}", name, unique_id, data_id.to_string());
let (mut remap_output, remap_stream) = builder.new_output();
let shutdown_button = builder.build(move |capabilities| async move {
if worker_idx != chosen_worker {
return;
}
let [mut cap]: [_; 1] = capabilities.try_into().expect("one capability per output");
let client = client.await;
let mut txns_cache = TxnsCache::<T, C>::open(&client, txns_id, Some(data_id)).await;
let _ = txns_cache.update_gt(&as_of).await;
let mut subscribe = txns_cache.data_subscribe(data_id, as_of.clone());
let data_write = client
.open_writer::<K, V, T, D>(
data_id,
Arc::clone(&data_key_schema),
Arc::clone(&data_val_schema),
Diagnostics::from_purpose("data read physical upper"),
)
.await
.expect("schema shouldn't change");
if let Some(snapshot) = subscribe.snapshot.take() {
snapshot.unblock_read(data_write).await;
}
debug!("{} emitting {:?}", name, subscribe.remap);
remap_output.give(&cap, subscribe.remap.clone());
loop {
let _ = txns_cache.update_ge(&subscribe.remap.logical_upper).await;
cap.downgrade(&subscribe.remap.logical_upper);
let data_listen_next =
txns_cache.data_listen_next(&subscribe.data_id, &subscribe.remap.logical_upper);
debug!(
"{} data_listen_next at {:?}: {:?}",
name, subscribe.remap.logical_upper, data_listen_next,
);
match data_listen_next {
// We've caught up to the txns upper and we have to wait for it
// to advance before asking again.
//
// Note that we're asking again with the same input, but once
// the cache is past remap.logical_upper (as it will be after
// this update_gt call), we're guaranteed to get an answer.
DataListenNext::WaitForTxnsProgress => {
let _ = txns_cache.update_gt(&subscribe.remap.logical_upper).await;
}
// The data shard got a write!
DataListenNext::ReadDataTo(new_upper) => {
// A write means both the physical and logical upper advance.
subscribe.remap = DataRemapEntry {
physical_upper: new_upper.clone(),
logical_upper: new_upper,
};
debug!("{} emitting {:?}", name, subscribe.remap);
remap_output.give(&cap, subscribe.remap.clone());
}
// We know there are no writes in `[logical_upper,
// new_progress)`, so advance our output frontier.
DataListenNext::EmitLogicalProgress(new_progress) => {
assert!(subscribe.remap.physical_upper < new_progress);
assert!(subscribe.remap.logical_upper < new_progress);
subscribe.remap.logical_upper = new_progress;
// As mentioned in the docs on `DataRemapEntry`, we only
// emit updates when the physical upper changes (which
// happens to makes the protocol a tiny bit more
// remap-like).
debug!("{} not emitting {:?}", name, subscribe.remap);
}
}
}
});
(remap_stream, shutdown_button.press_on_drop())
}
/// TODO: I'd much prefer the communication protocol between the two operators
/// to be exactly remap as defined in the [reclocking design doc]. However, we
/// can't quite recover exactly the information necessary to construct that at
/// the moment. Seems worth doing, but in the meantime, intentionally make this
/// look fairly different (`Stream` of `DataRemapEntry` instead of
/// `Collection<FromTime>`) to hopefully minimize confusion. As a performance
/// optimization, we only re-emit this when the _physical_ upper has changed,
/// which means that the frontier of the `Stream<DataRemapEntry<T>>` indicates
/// updates to the logical_upper of the most recent `DataRemapEntry` (i.e. the
/// one with the largest physical_upper).
///
/// [reclocking design doc]:
/// https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20210714_reclocking.md
fn txns_progress_source_global<K, V, T, D, P, C, G>(
scope: G,
name: &str,
ctx: TxnsContext,
client: impl Future<Output = PersistClient> + 'static,
txns_id: ShardId,
data_id: ShardId,
as_of: T,
data_key_schema: Arc<K::Schema>,
data_val_schema: Arc<V::Schema>,
unique_id: u64,
) -> (Stream<G, DataRemapEntry<T>>, PressOnDropButton)
where
K: Debug + Codec + Send + Sync,
V: Debug + Codec + Send + Sync,
T: Timestamp + Lattice + TotalOrder + StepForward + Codec64,
D: Debug + Data + Semigroup + Ord + Codec64 + Send + Sync,
P: Debug + Data,
C: TxnsCodec + 'static,
G: Scope<Timestamp = T>,
{
let worker_idx = scope.index();
let chosen_worker = usize::cast_from(name.hashed()) % scope.peers();
let name = format!("txns_progress_source({})", name);
let mut builder = AsyncOperatorBuilder::new(name.clone(), scope);
let name = format!("{} [{}] {:.9}", name, unique_id, data_id.to_string());
let (mut remap_output, remap_stream) = builder.new_output();
let shutdown_button = builder.build(move |capabilities| async move {
if worker_idx != chosen_worker {
return;
}
let [mut cap]: [_; 1] = capabilities.try_into().expect("one capability per output");
let client = client.await;
let txns_read = ctx.get_or_init::<T, C>(&client, txns_id).await;
let _ = txns_read.update_gt(as_of.clone()).await;
let data_write = client
.open_writer::<K, V, T, D>(
data_id,
Arc::clone(&data_key_schema),
Arc::clone(&data_val_schema),
Diagnostics::from_purpose("data read physical upper"),
)
.await
.expect("schema shouldn't change");
let mut rx = txns_read
.data_subscribe(data_id, as_of.clone(), Box::new(data_write))
.await;
debug!("{} starting as_of={:?}", name, as_of);
let mut physical_upper = T::minimum();
while let Some(remap) = rx.recv().await {
assert!(physical_upper <= remap.physical_upper);
assert!(physical_upper < remap.logical_upper);
let logical_upper = remap.logical_upper.clone();
// As mentioned in the docs on this function, we only
// emit updates when the physical upper changes (which
// happens to makes the protocol a tiny bit more
// remap-like).
if remap.physical_upper != physical_upper {
physical_upper = remap.physical_upper.clone();
debug!("{} emitting {:?}", name, remap);
remap_output.give(&cap, remap);
} else {
debug!("{} not emitting {:?}", name, remap);
}
cap.downgrade(&logical_upper);
}
});
(remap_stream, shutdown_button.press_on_drop())
}
fn txns_progress_frontiers<K, V, T, D, P, C, G>(
remap: Stream<G, DataRemapEntry<T>>,
passthrough: Stream<G, P>,
name: &str,
data_id: ShardId,
until: Antichain<T>,
unique_id: u64,
) -> (Stream<G, P>, PressOnDropButton)
where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + TotalOrder + StepForward + Codec64,
D: Data + Semigroup + Codec64 + Send + Sync,
P: Debug + Data,
C: TxnsCodec,
G: Scope<Timestamp = T>,
{
let name = format!("txns_progress_frontiers({})", name);
let mut builder = AsyncOperatorBuilder::new(name.clone(), passthrough.scope());
let name = format!(
"{} [{}] {}/{} {:.9}",
name,
unique_id,
passthrough.scope().index(),
passthrough.scope().peers(),
data_id.to_string(),
);
let (mut passthrough_output, passthrough_stream) =
builder.new_output::<CapacityContainerBuilder<_>>();
let mut remap_input = builder.new_disconnected_input(&remap, Pipeline);
let mut passthrough_input = builder.new_disconnected_input(&passthrough, Pipeline);
let shutdown_button = builder.build(move |capabilities| async move {
let [mut cap]: [_; 1] = capabilities.try_into().expect("one capability per output");
// None is used to indicate that both uppers are the empty antichain.
let mut remap = Some(DataRemapEntry {
physical_upper: T::minimum(),
logical_upper: T::minimum(),
});
// NB: The following loop uses `cap.time()`` to track how far we've
// progressed in copying along the passthrough input.
loop {
debug!("{} remap {:?}", name, remap);
if let Some(r) = remap.as_ref() {
assert!(r.physical_upper <= r.logical_upper);
// If we've passed through data to at least `physical_upper`,
// then it means we can artificially advance the upper of the
// output to `logical_upper`. This also indicates that we need
// to wait for the next DataRemapEntry. It can either (A) have
// the same physical upper or (B) have a larger physical upper.
//
// - If (A), then we would again satisfy this `physical_upper`
// check, again advance the logical upper again, ...
// - If (B), then we'd fall down to the code below, which copies
// the passthrough data until the frontier passes
// `physical_upper`, then loops back up here.
if r.physical_upper.less_equal(cap.time()) {
if cap.time() < &r.logical_upper {
cap.downgrade(&r.logical_upper);
}
remap = txns_progress_frontiers_read_remap_input(
&name,
&mut remap_input,
r.clone(),
)
.await;
continue;
}
}
// This only returns None when there are no more data left. Turn it
// into an empty frontier progress so we can re-use the shutdown
// code below.
let event = passthrough_input
.next()
.await
.unwrap_or_else(|| AsyncEvent::Progress(Antichain::new()));
match event {
// NB: Ignore the data_cap because this input is disconnected.
AsyncEvent::Data(_data_cap, mut data) => {
// NB: Nothing to do here for `until` because both the
// `shard_source` (before this operator) and
// `mfp_and_decode` (after this operator) do the necessary
// filtering.
debug!("{} emitting data {:?}", name, data);
passthrough_output.give_container(&cap, &mut data);
}
AsyncEvent::Progress(new_progress) => {
// If `until.less_equal(new_progress)`, it means that all
// subsequent batches will contain only times greater or
// equal to `until`, which means they can be dropped in
// their entirety.
//
// Ideally this check would live in `txns_progress_source`,
// but that turns out to be much more invasive (requires
// replacing lots of `T`s with `Antichain<T>`s). Given that
// we've been thinking about reworking the operators, do the
// easy but more wasteful thing for now.
if PartialOrder::less_equal(&until, &new_progress) {
debug!(
"{} progress {:?} has passed until {:?}",
name,
new_progress.elements(),
until.elements()
);
return;
}
// We reached the empty frontier! Shut down.
let Some(new_progress) = new_progress.into_option() else {
return;
};
// Recall that any reads of the data shard are always
// correct, so given that we've passed through any data
// from the input, that means we're free to pass through
// frontier updates too.
if cap.time() < &new_progress {
debug!("{} downgrading cap to {:?}", name, new_progress);
cap.downgrade(&new_progress);
}
}
}
}
});
(passthrough_stream, shutdown_button.press_on_drop())
}
async fn txns_progress_frontiers_read_remap_input<T, C>(
name: &str,
input: &mut AsyncInputHandle<T, Vec<DataRemapEntry<T>>, C>,
mut remap: DataRemapEntry<T>,
) -> Option<DataRemapEntry<T>>
where
T: Timestamp + TotalOrder,
C: InputConnection<T>,
{
while let Some(event) = input.next().await {
let xs = match event {
AsyncEvent::Progress(logical_upper) => {
if let Some(logical_upper) = logical_upper.into_option() {
if remap.logical_upper < logical_upper {
remap.logical_upper = logical_upper;
return Some(remap);
}
}
continue;
}
AsyncEvent::Data(_cap, xs) => xs,
};
for x in xs {
debug!("{} got remap {:?}", name, x);
// Don't assume anything about the ordering.
if remap.logical_upper < x.logical_upper {
assert!(
remap.physical_upper <= x.physical_upper,
"previous remap physical upper {:?} is ahead of new remap physical upper {:?}",
remap.physical_upper,
x.physical_upper,
);
// TODO: If the physical upper has advanced, that's a very
// strong hint that the data shard is about to be written to.
// Because the data shard's upper advances sparsely (on write,
// but not on passage of time) which invalidates the "every 1s"
// assumption of the default tuning, we've had to de-tune the
// listen sleeps on the paired persist_source. Maybe we use "one
// state" to wake it up in case pubsub doesn't and remove the
// listen polling entirely? (NB: This would have to happen in
// each worker so that it's guaranteed to happen in each
// process.)
remap = x;
}
}
return Some(remap);
}
// remap_input is closed, which indicates the data shard is finished.
None
}
/// The process global [`TxnsRead`] that any operator can communicate with.
#[derive(Default, Debug, Clone)]
pub struct TxnsContext {
read: Arc<tokio::sync::OnceCell<Box<dyn Any + Send + Sync>>>,
}
impl TxnsContext {
async fn get_or_init<T, C>(&self, client: &PersistClient, txns_id: ShardId) -> TxnsRead<T>
where
T: Timestamp + Lattice + Codec64 + TotalOrder + StepForward,
C: TxnsCodec + 'static,
{
let read = self
.read
.get_or_init(|| {
let client = client.clone();
async move {
let read: Box<dyn Any + Send + Sync> =
Box::new(TxnsRead::<T>::start::<C>(client, txns_id).await);
read
}
})
.await
.downcast_ref::<TxnsRead<T>>()
.expect("timestamp types should match");
// We initially only have one txns shard in the system.
assert_eq!(&txns_id, read.txns_id());
read.clone()
}
}
// Existing configs use the prefix "persist_txns_" for historical reasons. New
// configs should use the prefix "txn_wal_".
pub(crate) const DATA_SHARD_RETRYER_INITIAL_BACKOFF: Config<Duration> = Config::new(
"persist_txns_data_shard_retryer_initial_backoff",
Duration::from_millis(1024),
"The initial backoff when polling for new batches from a txns data shard persist_source.",
);
pub(crate) const DATA_SHARD_RETRYER_MULTIPLIER: Config<u32> = Config::new(
"persist_txns_data_shard_retryer_multiplier",
2,
"The backoff multiplier when polling for new batches from a txns data shard persist_source.",
);
pub(crate) const DATA_SHARD_RETRYER_CLAMP: Config<Duration> = Config::new(
"persist_txns_data_shard_retryer_clamp",
Duration::from_secs(16),
"The backoff clamp duration when polling for new batches from a txns data shard persist_source.",
);
/// Retry configuration for txn-wal data shard override of
/// `next_listen_batch`.
pub fn txns_data_shard_retry_params(cfg: &ConfigSet) -> RetryParameters {
RetryParameters {
fixed_sleep: Duration::ZERO,
initial_backoff: DATA_SHARD_RETRYER_INITIAL_BACKOFF.get(cfg),
multiplier: DATA_SHARD_RETRYER_MULTIPLIER.get(cfg),
clamp: DATA_SHARD_RETRYER_CLAMP.get(cfg),
}
}
/// A helper for subscribing to a data shard using the timely operators.
///
/// This could instead be a wrapper around a [Subscribe], but it's only used in
/// tests and maelstrom, so do it by wrapping the timely operators to get
/// additional coverage. For the same reason, hardcode the K, V, T, D types.
///
/// [Subscribe]: mz_persist_client::read::Subscribe
pub struct DataSubscribe {
pub(crate) as_of: u64,
pub(crate) worker: Worker<timely::communication::allocator::Thread>,
data: ProbeHandle<u64>,
txns: ProbeHandle<u64>,
capture: mpsc::Receiver<Event<u64, Vec<(String, u64, i64)>>>,
output: Vec<(String, u64, i64)>,
_tokens: Vec<PressOnDropButton>,
}
impl std::fmt::Debug for DataSubscribe {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let DataSubscribe {
as_of,
worker: _,
data,
txns,
capture: _,
output,
_tokens: _,
} = self;
f.debug_struct("DataSubscribe")
.field("as_of", as_of)
.field("data", data)
.field("txns", txns)
.field("output", output)
.finish_non_exhaustive()
}
}
impl DataSubscribe {
/// Creates a new [DataSubscribe].
pub fn new(
name: &str,
client: PersistClient,
txns_id: ShardId,
data_id: ShardId,
as_of: u64,
until: Antichain<u64>,
use_global_txn_cache: bool,
) -> Self {
let mut worker = Worker::new(
WorkerConfig::default(),
timely::communication::allocator::Thread::new(),
);
let (data, txns, capture, tokens) = worker.dataflow::<u64, _, _>(|scope| {
let (data_stream, shard_source_token) = scope.scoped::<u64, _, _>("hybrid", |scope| {
let client = client.clone();
let (data_stream, token) = shard_source::<String, (), u64, i64, _, _, _, _>(
scope,
name,
move || std::future::ready(client.clone()),
data_id,
Some(Antichain::from_elem(as_of)),
SnapshotMode::Include,
until.clone(),
false.then_some(|_, _: &_, _| unreachable!()),
Arc::new(StringSchema),
Arc::new(UnitSchema),
|_, _| true,
false.then_some(|| unreachable!()),
async {},
|error| panic!("data_subscribe: {error}"),
ProjectionPushdown::FetchAll,
);
(data_stream.leave(), token)
});
let (data, txns) = (ProbeHandle::new(), ProbeHandle::new());
let data_stream = data_stream.flat_map(|part| {
let part = part.parse();
part.part.map(|((k, v), t, d)| {
let (k, ()) = (k.unwrap(), v.unwrap());
(k, t, d)
})
});
let data_stream = data_stream.probe_with(&data);
// We purposely do not use the `ConfigSet` in `client` so that
// different tests can set different values.
let config_set = ConfigSet::default().add(&USE_GLOBAL_TXN_CACHE_SOURCE);
let mut updates = ConfigUpdates::default();
updates.add(&USE_GLOBAL_TXN_CACHE_SOURCE, use_global_txn_cache);
updates.apply(&config_set);
let (data_stream, mut txns_progress_token) =
txns_progress::<String, (), u64, i64, _, TxnsCodecDefault, _, _>(
data_stream,
name,
&TxnsContext::default(),
&config_set,
|| std::future::ready(client.clone()),
txns_id,
data_id,
as_of,
until,
Arc::new(StringSchema),
Arc::new(UnitSchema),
);
let data_stream = data_stream.probe_with(&txns);
let mut tokens = shard_source_token;
tokens.append(&mut txns_progress_token);
(data, txns, data_stream.capture(), tokens)
});
Self {
as_of,
worker,
data,
txns,
capture,
output: Vec::new(),
_tokens: tokens,
}
}
/// Returns the exclusive progress of the dataflow.
pub fn progress(&self) -> u64 {
self.txns
.with_frontier(|f| *f.as_option().unwrap_or(&u64::MAX))
}
/// Steps the dataflow, capturing output.
pub fn step(&mut self) {
self.worker.step();
self.capture_output()
}
pub(crate) fn capture_output(&mut self) {
loop {
let event = match self.capture.try_recv() {
Ok(x) => x,
Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => break,
};
match event {
Event::Progress(_) => {}
Event::Messages(_, mut msgs) => self.output.append(&mut msgs),
}
}
}
/// Steps the dataflow past the given time, capturing output.
#[cfg(test)]
pub async fn step_past(&mut self, ts: u64) {
while self.txns.less_equal(&ts) {
tracing::trace!(
"progress at {:?}",
self.txns.with_frontier(|x| x.to_owned()).elements()
);
self.step();
tokio::task::yield_now().await;
}
}
/// Returns captured output.
pub fn output(&self) -> &Vec<(String, u64, i64)> {
&self.output
}
}
/// A handle to a [DataSubscribe] running in a task.
#[derive(Debug)]
pub struct DataSubscribeTask {
/// Carries step requests. A `None` timestamp requests one step, a
/// `Some(ts)` requests stepping until we progress beyond `ts`.
tx: std::sync::mpsc::Sender<(
Option<u64>,
tokio::sync::oneshot::Sender<(Vec<(String, u64, i64)>, u64)>,
)>,
task: mz_ore::task::JoinHandle<Vec<(String, u64, i64)>>,
output: Vec<(String, u64, i64)>,
progress: u64,
}
impl DataSubscribeTask {
/// Creates a new [DataSubscribeTask].
pub async fn new(
client: PersistClient,
txns_id: ShardId,
data_id: ShardId,
as_of: u64,
) -> Self {
let cache = TxnsCache::open(&client, txns_id, Some(data_id)).await;
let (tx, rx) = std::sync::mpsc::channel();
let task = mz_ore::task::spawn_blocking(
|| "data_subscribe task",
move || Self::task(client, cache, data_id, as_of, rx),
);
DataSubscribeTask {
tx,
task,
output: Vec::new(),
progress: 0,
}
}
#[cfg(test)]
async fn step(&mut self) {
self.send(None).await;
}
/// Steps the dataflow past the given time, capturing output.
pub async fn step_past(&mut self, ts: u64) -> u64 {
self.send(Some(ts)).await;
self.progress
}
/// Returns captured output.
pub fn output(&self) -> &Vec<(String, u64, i64)> {
&self.output
}
async fn send(&mut self, ts: Option<u64>) {
let (tx, rx) = tokio::sync::oneshot::channel();
self.tx.send((ts, tx)).expect("task should be running");
let (mut new_output, new_progress) = rx.await.expect("task should be running");
self.output.append(&mut new_output);
assert!(self.progress <= new_progress);
self.progress = new_progress;
}
/// Signals for the task to exit, and then waits for this to happen.
///
/// _All_ output from the lifetime of the task (not just what was previously
/// captured) is returned.
pub async fn finish(self) -> Vec<(String, u64, i64)> {
// Closing the channel signals the task to exit.
drop(self.tx);
self.task.wait_and_assert_finished().await
}
fn task(
client: PersistClient,
cache: TxnsCache<u64>,
data_id: ShardId,
as_of: u64,
rx: std::sync::mpsc::Receiver<(
Option<u64>,
tokio::sync::oneshot::Sender<(Vec<(String, u64, i64)>, u64)>,
)>,
) -> Vec<(String, u64, i64)> {
let mut subscribe = DataSubscribe::new(
"DataSubscribeTask",
client.clone(),
cache.txns_id(),
data_id,
as_of,
Antichain::new(),
true,
);
let mut output = Vec::new();
loop {
let (ts, tx) = match rx.try_recv() {
Ok(x) => x,
Err(TryRecvError::Empty) => {
// No requests, continue stepping so nothing deadlocks.
subscribe.step();
continue;
}
Err(TryRecvError::Disconnected) => {
// All done! Return our output.
return output;
}
};
// Always step at least once.
subscribe.step();
// If we got a ts, make sure to step past it.
if let Some(ts) = ts {
while subscribe.progress() <= ts {
subscribe.step();
}
}
let new_output = std::mem::take(&mut subscribe.output);
output.extend(new_output.iter().cloned());
let _ = tx.send((new_output, subscribe.progress()));
}
}
}
#[cfg(test)]
mod tests {
use itertools::{Either, Itertools};
use mz_persist_types::Opaque;
use crate::tests::writer;
use crate::txns::TxnsHandle;
use super::*;
impl<K, V, T, D, O, C> TxnsHandle<K, V, T, D, O, C>
where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + TotalOrder + StepForward + Codec64,
D: Debug + Semigroup + Ord + Codec64 + Send + Sync,
O: Opaque + Debug + Codec64,
C: TxnsCodec,
{
async fn subscribe_task(
&self,
client: &PersistClient,
data_id: ShardId,
as_of: u64,
) -> DataSubscribeTask {
DataSubscribeTask::new(client.clone(), self.txns_id(), data_id, as_of).await
}
}
#[mz_ore::test(tokio::test(flavor = "multi_thread"))]
#[cfg_attr(miri, ignore)] // too slow
async fn data_subscribe() {
async fn step(subs: &mut Vec<DataSubscribeTask>) {
for sub in subs.iter_mut() {
sub.step().await;
}
}
let client = PersistClient::new_for_tests().await;
let mut txns = TxnsHandle::expect_open(client.clone()).await;
let log = txns.new_log();
let d0 = ShardId::new();
// Start a subscription before the shard gets registered.
let mut subs = Vec::new();
subs.push(txns.subscribe_task(&client, d0, 5).await);
step(&mut subs).await;
// Now register the shard. Also start a new subscription and step the
// previous one (plus repeat this for every later step).
txns.register(1, [writer(&client, d0).await]).await.unwrap();
subs.push(txns.subscribe_task(&client, d0, 5).await);
step(&mut subs).await;
// Now write something unrelated.
let d1 = txns.expect_register(2).await;
txns.expect_commit_at(3, d1, &["nope"], &log).await;
subs.push(txns.subscribe_task(&client, d0, 5).await);
step(&mut subs).await;
// Now write to our shard before.
txns.expect_commit_at(4, d0, &["4"], &log).await;
subs.push(txns.subscribe_task(&client, d0, 5).await);
step(&mut subs).await;
// Now write to our shard at the as_of.
txns.expect_commit_at(5, d0, &["5"], &log).await;
subs.push(txns.subscribe_task(&client, d0, 5).await);
step(&mut subs).await;
// Now write to our shard past the as_of.
txns.expect_commit_at(6, d0, &["6"], &log).await;
subs.push(txns.subscribe_task(&client, d0, 5).await);
step(&mut subs).await;
// Now write something unrelated again.
txns.expect_commit_at(7, d1, &["nope"], &log).await;
subs.push(txns.subscribe_task(&client, d0, 5).await);
step(&mut subs).await;
// Verify that the dataflows can progress to the expected point and that
// we read the right thing no matter when the dataflow started.
for mut sub in subs {
let progress = sub.step_past(7).await;
assert_eq!(progress, 8);
log.assert_eq(d0, 5, 8, sub.finish().await);
}
}
#[mz_ore::test(tokio::test(flavor = "multi_thread"))]
#[cfg_attr(miri, ignore)] // too slow
async fn subscribe_shard_finalize() {
let client = PersistClient::new_for_tests().await;
let mut txns = TxnsHandle::expect_open(client.clone()).await;
let log = txns.new_log();
let d0 = txns.expect_register(1).await;
// Start the operator as_of the register ts.
let mut sub = txns.read_cache().expect_subscribe(&client, d0, 1);
sub.step_past(1).await;
// Write to it via txns.
txns.expect_commit_at(2, d0, &["foo"], &log).await;
sub.step_past(2).await;
// Unregister it.
txns.forget(3, [d0]).await.unwrap();
sub.step_past(3).await;
// TODO: Hard mode, see if we can get the rest of this test to work even
// _without_ the txns shard advancing.
txns.begin().commit_at(&mut txns, 7).await.unwrap();
// The operator should continue to emit data written directly even
// though it's no longer in the txns set.
let mut d0_write = writer(&client, d0).await;
let key = "bar".to_owned();
crate::small_caa(|| "test", &mut d0_write, &[((&key, &()), &5, 1)], 4, 6)
.await
.unwrap();
log.record((d0, key, 5, 1));
sub.step_past(4).await;
// Now finalize the shard to writes.
let () = d0_write
.compare_and_append_batch(&mut [], Antichain::from_elem(6), Antichain::new())
.await
.unwrap()
.unwrap();
while sub.txns.less_than(&u64::MAX) {
sub.step();
tokio::task::yield_now().await;
}
// Make sure we read the correct things.
log.assert_eq(d0, 1, u64::MAX, sub.output().clone());
// Also make sure that we can read the right things if we start up after
// the forget but before the direct write and ditto after the direct
// write.
log.assert_subscribe(d0, 4, u64::MAX).await;
log.assert_subscribe(d0, 6, u64::MAX).await;
}
#[mz_ore::test(tokio::test(flavor = "multi_thread"))]
#[cfg_attr(miri, ignore)] // too slow
async fn subscribe_shard_register_forget() {
let client = PersistClient::new_for_tests().await;
let mut txns = TxnsHandle::expect_open(client.clone()).await;
let d0 = ShardId::new();
// Start a subscription on the data shard.
let mut sub = txns.read_cache().expect_subscribe(&client, d0, 0);
assert_eq!(sub.progress(), 0);
// Register the shard at 10.
txns.register(10, [writer(&client, d0).await])
.await
.unwrap();
sub.step_past(10).await;
assert!(
sub.progress() > 10,
"operator should advance past 10 when shard is registered"
);
// Forget the shard at 20.
txns.forget(20, [d0]).await.unwrap();
sub.step_past(20).await;
assert!(
sub.progress() > 20,
"operator should advance past 20 when shard is forgotten"
);
}
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] // too slow
async fn as_of_until() {
let client = PersistClient::new_for_tests().await;
let mut txns = TxnsHandle::expect_open(client.clone()).await;
let log = txns.new_log();
let d0 = txns.expect_register(1).await;
txns.expect_commit_at(2, d0, &["2"], &log).await;
txns.expect_commit_at(3, d0, &["3"], &log).await;
txns.expect_commit_at(4, d0, &["4"], &log).await;
txns.expect_commit_at(5, d0, &["5"], &log).await;
txns.expect_commit_at(6, d0, &["6"], &log).await;
txns.expect_commit_at(7, d0, &["7"], &log).await;
let until = 5;
let mut sub = DataSubscribe::new(
"as_of_until",
client,
txns.txns_id(),
d0,
3,
Antichain::from_elem(until),
true,
);
// Manually step the dataflow, instead of going through the
// `DataSubscribe` helper because we're interested in all captured
// events.
while sub.txns.less_equal(&5) {
sub.worker.step();
tokio::task::yield_now().await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
let (actual_progresses, actual_events): (Vec<_>, Vec<_>) =
sub.capture.into_iter().partition_map(|event| match event {
Event::Progress(progress) => Either::Left(progress),
Event::Messages(ts, data) => Either::Right((ts, data)),
});
let expected = vec![
(3, vec![("2".to_owned(), 3, 1), ("3".to_owned(), 3, 1)]),
(3, vec![("4".to_owned(), 4, 1)]),
];
assert_eq!(actual_events, expected);
// The number and contents of progress messages is not guaranteed and
// depends on the downgrade behavior. The only thing we can assert is
// the max progress timestamp, if there is one, is less than the until.
if let Some(max_progress_ts) = actual_progresses
.into_iter()
.flatten()
.map(|(ts, _diff)| ts)
.max()
{
assert!(max_progress_ts < until, "{max_progress_ts} < {until}");
}
}
}