Skip to main content

mz_persist_client/operators/
shard_source.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A source that reads from a persist shard.
11
12use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::collections::hash_map::DefaultHasher;
15use std::convert::Infallible;
16use std::fmt::{Debug, Formatter};
17use std::future::Future;
18use std::hash::{Hash, Hasher};
19use std::pin::pin;
20use std::rc::Rc;
21use std::sync::Arc;
22use std::time::Instant;
23
24use anyhow::anyhow;
25use arrow::array::ArrayRef;
26use differential_dataflow::Hashable;
27use differential_dataflow::difference::Monoid;
28use differential_dataflow::lattice::Lattice;
29use futures_util::StreamExt;
30use mz_ore::cast::CastFrom;
31use mz_ore::collections::CollectionExt;
32use mz_persist_types::stats::PartStats;
33use mz_persist_types::{Codec, Codec64};
34use mz_timely_util::builder_async::{
35    Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
36};
37use timely::PartialOrder;
38use timely::container::CapacityContainerBuilder;
39use timely::dataflow::channels::pact::{Exchange, Pipeline};
40use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Enter, Feedback, Leave};
41use timely::dataflow::{Scope, StreamVec};
42use timely::order::TotalOrder;
43use timely::progress::frontier::AntichainRef;
44use timely::progress::{Antichain, Timestamp, timestamp::Refines};
45use tracing::{debug, trace};
46
47use crate::batch::BLOB_TARGET_SIZE;
48use crate::cfg::{RetryParameters, USE_CRITICAL_SINCE_SOURCE};
49use crate::fetch::{ExchangeableBatchPart, FetchedBlob, Lease};
50use crate::internal::state::BatchPart;
51use crate::stats::{STATS_AUDIT_PERCENT, STATS_FILTER_ENABLED};
52use crate::{Diagnostics, PersistClient, ShardId};
53
54/// The result of applying an MFP to a part, if we know it.
55#[derive(Debug, Clone, PartialEq, Default)]
56pub enum FilterResult {
57    /// This dataflow may or may not filter out any row in this part.
58    #[default]
59    Keep,
60    /// This dataflow is guaranteed to filter out all records in this part.
61    Discard,
62    /// This dataflow will keep all the rows, but the values are irrelevant:
63    /// include the given single-row KV data instead.
64    ReplaceWith {
65        /// The single-element key column.
66        key: ArrayRef,
67        /// The single-element val column.
68        val: ArrayRef,
69    },
70}
71
72impl FilterResult {
73    /// The noop filtering function: return the default value for all parts.
74    pub fn keep_all<T>(_stats: &PartStats, _frontier: AntichainRef<T>) -> FilterResult {
75        Self::Keep
76    }
77}
78
79/// Many dataflows, including the Persist source, encounter errors that are neither data-plane
80/// errors (a la SourceData) nor bugs. This includes:
81/// - lease timeouts: the source has failed to heartbeat, the lease timed out, and our inputs are
82///   GCed away. (But we'd be able to use the compaction output if we restart.)
83/// - external transactions: our Kafka transaction has failed, and we can't re-create it without
84///   re-ingesting a bunch of data we no longer have in memory. (But we could do on restart.)
85///
86/// It would be an error to simply exit from our dataflow operator, since that allows timely
87/// frontiers to advance, which signals progress that we haven't made. So we report the error and
88/// attempt to trigger a restart: either directly (via a `halt!`) or indirectly with a callback.
89#[derive(Clone)]
90pub enum ErrorHandler {
91    /// Halt the process on error.
92    Halt(&'static str),
93    /// Signal an error to a higher-level supervisor.
94    Signal(Rc<dyn Fn(anyhow::Error) + 'static>),
95}
96
97impl Debug for ErrorHandler {
98    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
99        match self {
100            ErrorHandler::Halt(name) => f.debug_tuple("ErrorHandler::Halt").field(name).finish(),
101            ErrorHandler::Signal(_) => f.write_str("ErrorHandler::Signal"),
102        }
103    }
104}
105
106impl ErrorHandler {
107    /// Returns a new error handler that uses the provided function to signal an error.
108    pub fn signal(signal_fn: impl Fn(anyhow::Error) + 'static) -> Self {
109        Self::Signal(Rc::new(signal_fn))
110    }
111
112    /// Signal an error to an error handler. This function never returns: logically it blocks until
113    /// restart, though that restart might be sooner (if halting) or later (if triggering a dataflow
114    /// restart, for example).
115    pub async fn report_and_stop(&self, error: anyhow::Error) -> ! {
116        match self {
117            ErrorHandler::Halt(name) => {
118                mz_ore::halt!("unhandled error in {name}: {error:#}")
119            }
120            ErrorHandler::Signal(callback) => {
121                let () = callback(error);
122                std::future::pending().await
123            }
124        }
125    }
126}
127
128/// Creates a new source that reads from a persist shard, distributing the work
129/// of reading data to all timely workers.
130///
131/// All times emitted will have been [advanced by] the given `as_of` frontier.
132/// All updates at times greater or equal to `until` will be suppressed.
133/// The `map_filter_project` argument, if supplied, may be partially applied,
134/// and any un-applied part of the argument will be left behind in the argument.
135///
136/// The `desc_transformer` interposes an operator in the stream before the
137/// chosen data is fetched. This is currently used to provide flow control... see
138/// usages for details.
139///
140/// [advanced by]: differential_dataflow::lattice::Lattice::advance_by
141pub fn shard_source<'inner, 'outer, K, V, T, D, DT, TOuter, C>(
142    outer: Scope<'outer, TOuter>,
143    scope: Scope<'inner, T>,
144    name: &str,
145    client: impl Fn() -> C,
146    shard_id: ShardId,
147    as_of: Option<Antichain<TOuter>>,
148    snapshot_mode: SnapshotMode,
149    until: Antichain<TOuter>,
150    desc_transformer: Option<DT>,
151    key_schema: Arc<K::Schema>,
152    val_schema: Arc<V::Schema>,
153    filter_fn: impl FnMut(&PartStats, AntichainRef<TOuter>) -> FilterResult + 'static,
154    // If Some, an override for the default listen sleep retry parameters.
155    listen_sleep: Option<impl Fn() -> RetryParameters + 'static>,
156    start_signal: impl Future<Output = ()> + 'static,
157    error_handler: ErrorHandler,
158) -> (
159    StreamVec<'inner, T, FetchedBlob<K, V, TOuter, D>>,
160    Vec<PressOnDropButton>,
161)
162where
163    K: Debug + Codec,
164    V: Debug + Codec,
165    D: Monoid + Codec64 + Send + Sync,
166    // TODO: Figure out how to get rid of the TotalOrder bound :(.
167    TOuter: Timestamp + Lattice + Codec64 + TotalOrder + Sync,
168    T: Refines<TOuter>,
169    DT: FnOnce(
170        Scope<'inner, T>,
171        StreamVec<'inner, T, (usize, ExchangeableBatchPart<TOuter>)>,
172        usize,
173    ) -> (
174        StreamVec<'inner, T, (usize, ExchangeableBatchPart<TOuter>)>,
175        Vec<PressOnDropButton>,
176    ),
177    C: Future<Output = PersistClient> + Send + 'static,
178{
179    // WARNING! If emulating any of this code, you should read the doc string on
180    // [`LeasedBatchPart`] and [`Subscribe`] or will likely run into intentional
181    // panics.
182    //
183    // This source is split as such:
184    // 1. Sets up `async_stream`, which only yields data (parts) on one chosen
185    //    worker. Generating also generates SeqNo leases on the chosen worker,
186    //    ensuring `part`s do not get GCed while in flight.
187    // 2. Part distribution: A timely source operator which continuously reads
188    //    from that stream, and distributes the data among workers.
189    // 3. Part fetcher: A timely operator which downloads the part's contents
190    //    from S3, and outputs them to a timely stream. Additionally, the
191    //    operator returns the `LeasedBatchPart` to the original worker, so it
192    //    can release the SeqNo lease.
193
194    let chosen_worker = usize::cast_from(name.hashed()) % scope.peers();
195
196    let mut tokens = vec![];
197
198    // we can safely pass along a zero summary from this feedback edge,
199    // as the input is disconnected from the operator's output
200    let (completed_fetches_feedback_handle, completed_fetches_feedback_stream) =
201        scope.feedback(T::Summary::default());
202
203    // Sniff out if this is on behalf of a transient dataflow. This doesn't
204    // affect the fetch behavior, it just causes us to use a different set of
205    // metrics.
206    let is_transient = !until.is_empty();
207
208    let (descs, descs_token) = shard_source_descs::<K, V, D, TOuter>(
209        outer,
210        name,
211        client(),
212        shard_id.clone(),
213        as_of,
214        snapshot_mode,
215        until,
216        completed_fetches_feedback_stream.leave(outer),
217        chosen_worker,
218        Arc::clone(&key_schema),
219        Arc::clone(&val_schema),
220        filter_fn,
221        listen_sleep,
222        start_signal,
223        error_handler.clone(),
224    );
225    tokens.push(descs_token);
226
227    let descs = descs.enter(scope);
228    let descs = match desc_transformer {
229        Some(desc_transformer) => {
230            let (descs, extra_tokens) = desc_transformer(scope, descs, chosen_worker);
231            tokens.extend(extra_tokens);
232            descs
233        }
234        None => descs,
235    };
236
237    let (parts, completed_fetches_stream, fetch_token) = shard_source_fetch::<K, V, TOuter, D, T>(
238        descs,
239        name,
240        client(),
241        shard_id,
242        key_schema,
243        val_schema,
244        is_transient,
245        error_handler,
246    );
247    completed_fetches_stream.connect_loop(completed_fetches_feedback_handle);
248    tokens.push(fetch_token);
249
250    (parts, tokens)
251}
252
253/// An enum describing whether a snapshot should be emitted
254#[derive(Debug, Clone, Copy)]
255pub enum SnapshotMode {
256    /// The snapshot will be included in the stream
257    Include,
258    /// The snapshot will not be included in the stream
259    Exclude,
260}
261
262#[derive(Debug)]
263struct LeaseManager<T> {
264    leases: BTreeMap<T, Vec<Lease>>,
265}
266
267impl<T: Timestamp + Codec64> LeaseManager<T> {
268    fn new() -> Self {
269        Self {
270            leases: BTreeMap::new(),
271        }
272    }
273
274    /// Track a lease associated with a particular time.
275    fn push_at(&mut self, time: T, lease: Lease) {
276        self.leases.entry(time).or_default().push(lease);
277    }
278
279    /// Discard any leases for data that aren't past the given frontier.
280    fn advance_to(&mut self, frontier: AntichainRef<T>)
281    where
282        // If we allowed partial orders, we'd need to reconsider every key on each advance.
283        T: TotalOrder,
284    {
285        while let Some(first) = self.leases.first_entry() {
286            if frontier.less_equal(first.key()) {
287                break; // This timestamp is still live!
288            }
289            drop(first.remove());
290        }
291    }
292}
293
294pub(crate) fn shard_source_descs<'outer, K, V, D, TOuter>(
295    scope: Scope<'outer, TOuter>,
296    name: &str,
297    client: impl Future<Output = PersistClient> + Send + 'static,
298    shard_id: ShardId,
299    as_of: Option<Antichain<TOuter>>,
300    snapshot_mode: SnapshotMode,
301    until: Antichain<TOuter>,
302    completed_fetches_stream: StreamVec<'outer, TOuter, Infallible>,
303    chosen_worker: usize,
304    key_schema: Arc<K::Schema>,
305    val_schema: Arc<V::Schema>,
306    mut filter_fn: impl FnMut(&PartStats, AntichainRef<TOuter>) -> FilterResult + 'static,
307    // If Some, an override for the default listen sleep retry parameters.
308    listen_sleep: Option<impl Fn() -> RetryParameters + 'static>,
309    start_signal: impl Future<Output = ()> + 'static,
310    error_handler: ErrorHandler,
311) -> (
312    StreamVec<'outer, TOuter, (usize, ExchangeableBatchPart<TOuter>)>,
313    PressOnDropButton,
314)
315where
316    K: Debug + Codec,
317    V: Debug + Codec,
318    D: Monoid + Codec64 + Send + Sync,
319    // TODO: Figure out how to get rid of the TotalOrder bound :(.
320    TOuter: Timestamp + Lattice + Codec64 + TotalOrder + Sync,
321{
322    let worker_index = scope.index();
323    let num_workers = scope.peers();
324
325    // This is a generator that sets up an async `Stream` that can be continuously polled to get the
326    // values that are `yield`-ed from it's body.
327    let name_owned = name.to_owned();
328
329    // Create a shared slot between the operator to store the listen handle
330    let listen_handle = Rc::new(RefCell::new(None));
331    let return_listen_handle = Rc::clone(&listen_handle);
332
333    // Create a oneshot channel to give the part returner a SubscriptionLeaseReturner
334    let (tx, rx) = tokio::sync::oneshot::channel::<Rc<RefCell<LeaseManager<TOuter>>>>();
335    let mut builder = AsyncOperatorBuilder::new(
336        format!("shard_source_descs_return({})", name),
337        scope.clone(),
338    );
339    let mut completed_fetches = builder.new_disconnected_input(completed_fetches_stream, Pipeline);
340    // This operator doesn't need to use a token because it naturally exits when its input
341    // frontier reaches the empty antichain.
342    builder.build(move |_caps| async move {
343        let Ok(leases) = rx.await else {
344            // Either we're not the chosen worker or the dataflow was shutdown before the
345            // subscriber was even created.
346            return;
347        };
348        while let Some(event) = completed_fetches.next().await {
349            let Event::Progress(frontier) = event else {
350                continue;
351            };
352            leases.borrow_mut().advance_to(frontier.borrow());
353        }
354        // Make it explicit that the subscriber is kept alive until we have finished returning parts
355        drop(return_listen_handle);
356    });
357
358    let mut builder =
359        AsyncOperatorBuilder::new(format!("shard_source_descs({})", name), scope.clone());
360    let (descs_output, descs_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
361
362    #[allow(clippy::await_holding_refcell_ref)]
363    let shutdown_button = builder.build(move |caps| async move {
364        let mut cap_set = CapabilitySet::from_elem(caps.into_element());
365
366        // Only one worker is responsible for distributing parts
367        if worker_index != chosen_worker {
368            trace!(
369                "We are not the chosen worker ({}), exiting...",
370                chosen_worker
371            );
372            return;
373        }
374
375        // Internally, the `open_leased_reader` call registers a new LeasedReaderId and then fires
376        // up a background tokio task to heartbeat it. It is possible that we might get a
377        // particularly adversarial scheduling where the CRDB query to register the id is sent and
378        // then our Future is not polled again for a long time, resulting is us never spawning the
379        // heartbeat task. Run reader creation in a task to attempt to defend against this.
380        //
381        // TODO: Really we likely need to swap the inners of all persist operators to be
382        // communicating with a tokio task over a channel, but that's much much harder, so for now
383        // we whack the moles as we see them.
384        let mut read = mz_ore::task::spawn(|| format!("shard_source_reader({})", name_owned), {
385            let diagnostics = Diagnostics {
386                handle_purpose: format!("shard_source({})", name_owned),
387                shard_name: name_owned.clone(),
388            };
389            async move {
390                let client = client.await;
391                client
392                    .open_leased_reader::<K, V, TOuter, D>(
393                        shard_id,
394                        key_schema,
395                        val_schema,
396                        diagnostics,
397                        USE_CRITICAL_SINCE_SOURCE.get(client.dyncfgs()),
398                    )
399                    .await
400            }
401        })
402        .await
403        .expect("could not open persist shard");
404
405        // Wait for the start signal only after we have obtained a read handle. This makes "cannot
406        // serve requested as_of" panics caused by (database-issues#8729) significantly less
407        // likely.
408        let () = start_signal.await;
409
410        let cfg = read.cfg.clone();
411        let metrics = Arc::clone(&read.metrics);
412
413        let as_of = as_of.unwrap_or_else(|| read.since().clone());
414
415        // Eagerly downgrade our frontier to the initial as_of. This makes sure
416        // that the output frontier of the `persist_source` closely tracks the
417        // `upper` frontier of the persist shard. It might be that the snapshot
418        // for `as_of` is not initially available yet, but this makes sure we
419        // already downgrade to it.
420        //
421        // Downstream consumers might rely on close frontier tracking for making
422        // progress. For example, the `persist_sink` needs to know the
423        // up-to-date upper of the output shard to make progress because it will
424        // only write out new data once it knows that earlier writes went
425        // through, including the initial downgrade of the shard upper to the
426        // `as_of`.
427        //
428        // NOTE: We have to do this before our `snapshot()` call because that
429        // will block when there is no data yet available in the shard.
430        cap_set.downgrade(as_of.clone());
431
432        let mut snapshot_parts =
433            match snapshot_mode {
434                SnapshotMode::Include => match read.snapshot(as_of.clone()).await {
435                    Ok(parts) => parts,
436                    Err(e) => error_handler
437                        .report_and_stop(anyhow!(
438                            "{name_owned}: {shard_id} cannot serve requested as_of {as_of:?}: {e:?}"
439                        ))
440                        .await,
441                },
442                SnapshotMode::Exclude => vec![],
443            };
444
445        // We're about to start producing parts to be fetched whose leases will be returned by the
446        // `shard_source_descs_return` operator above. In order for that operator to successfully
447        // return the leases we send it the lease returner associated with our shared subscriber.
448        let leases = Rc::new(RefCell::new(LeaseManager::new()));
449        tx.send(Rc::clone(&leases))
450            .expect("lease returner exited before desc producer");
451
452        // Store the listen handle in the shared slot so that it stays alive until both operators
453        // exit
454        let mut listen = listen_handle.borrow_mut();
455        let listen = match read.listen(as_of.clone()).await {
456            Ok(handle) => listen.insert(handle),
457            Err(e) => {
458                error_handler
459                    .report_and_stop(anyhow!(
460                        "{name_owned}: {shard_id} cannot serve requested as_of {as_of:?}: {e:?}"
461                    ))
462                    .await
463            }
464        };
465
466        let listen_retry = listen_sleep.as_ref().map(|retry| retry());
467
468        // The head of the stream is enriched with the snapshot parts if they exist
469        let listen_head = if !snapshot_parts.is_empty() {
470            let (mut parts, progress) = listen.next(listen_retry).await;
471            snapshot_parts.append(&mut parts);
472            futures::stream::iter(Some((snapshot_parts, progress)))
473        } else {
474            futures::stream::iter(None)
475        };
476
477        // The tail of the stream is all subsequent parts
478        let listen_tail = futures::stream::unfold(listen, |listen| async move {
479            Some((listen.next(listen_retry).await, listen))
480        });
481
482        let mut shard_stream = pin!(listen_head.chain(listen_tail));
483
484        // Ideally, we'd like our audit overhead to be proportional to the actual amount of "real"
485        // work we're doing in the source. So: start with a small, constant budget; add to the
486        // budget when we do real work; and skip auditing a part if we don't have the budget for it.
487        let mut audit_budget_bytes = u64::cast_from(BLOB_TARGET_SIZE.get(&cfg).saturating_mul(2));
488
489        // All future updates will be timestamped after this frontier.
490        let mut current_frontier = as_of.clone();
491
492        // If `until.less_equal(current_frontier)`, it means that all subsequent batches will contain only
493        // times greater or equal to `until`, which means they can be dropped in their entirety.
494        while !PartialOrder::less_equal(&until, &current_frontier) {
495            let (parts, progress) = shard_stream.next().await.expect("infinite stream");
496
497            // Emit the part at the `(ts, 0)` time. The `granular_backpressure`
498            // operator will refine this further, if its enabled.
499            let current_ts = current_frontier
500                .as_option()
501                .expect("until should always be <= the empty frontier");
502            let session_cap = cap_set.delayed(current_ts);
503
504            for mut part_desc in parts {
505                // TODO: Push more of this logic into LeasedBatchPart like we've
506                // done for project?
507                if STATS_FILTER_ENABLED.get(&cfg) {
508                    let filter_result = match &part_desc.part {
509                        BatchPart::Hollow(x) => {
510                            let should_fetch =
511                                x.stats.as_ref().map_or(FilterResult::Keep, |stats| {
512                                    filter_fn(&stats.decode(), current_frontier.borrow())
513                                });
514                            should_fetch
515                        }
516                        BatchPart::Inline { .. } => FilterResult::Keep,
517                    };
518                    // Apply the filter: discard or substitute the part if required.
519                    let bytes = u64::cast_from(part_desc.encoded_size_bytes());
520                    match filter_result {
521                        FilterResult::Keep => {
522                            audit_budget_bytes = audit_budget_bytes.saturating_add(bytes);
523                        }
524                        FilterResult::Discard => {
525                            metrics.pushdown.parts_filtered_count.inc();
526                            metrics.pushdown.parts_filtered_bytes.inc_by(bytes);
527                            let should_audit = match &part_desc.part {
528                                BatchPart::Hollow(x) => {
529                                    let mut h = DefaultHasher::new();
530                                    x.key.hash(&mut h);
531                                    usize::cast_from(h.finish()) % 100
532                                        < STATS_AUDIT_PERCENT.get(&cfg)
533                                }
534                                BatchPart::Inline { .. } => false,
535                            };
536                            if should_audit && bytes < audit_budget_bytes {
537                                audit_budget_bytes -= bytes;
538                                metrics.pushdown.parts_audited_count.inc();
539                                metrics.pushdown.parts_audited_bytes.inc_by(bytes);
540                                part_desc.request_filter_pushdown_audit();
541                            } else {
542                                debug!(
543                                    "skipping part because of stats filter {:?}",
544                                    part_desc.part.stats()
545                                );
546                                continue;
547                            }
548                        }
549                        FilterResult::ReplaceWith { key, val } => {
550                            part_desc.maybe_optimize(&cfg, key, val);
551                            audit_budget_bytes = audit_budget_bytes.saturating_add(bytes);
552                        }
553                    }
554                    let bytes = u64::cast_from(part_desc.encoded_size_bytes());
555                    if part_desc.part.is_inline() {
556                        metrics.pushdown.parts_inline_count.inc();
557                        metrics.pushdown.parts_inline_bytes.inc_by(bytes);
558                    } else {
559                        metrics.pushdown.parts_fetched_count.inc();
560                        metrics.pushdown.parts_fetched_bytes.inc_by(bytes);
561                    }
562                }
563
564                // Give the part to a random worker. This isn't round robin in an attempt to avoid
565                // skew issues: if your parts alternate size large, small, then you'll end up only
566                // using half of your workers.
567                //
568                // There's certainly some other things we could be doing instead here, but this has
569                // seemed to work okay so far. Continue to revisit as necessary.
570                let worker_idx = usize::cast_from(Instant::now().hashed()) % num_workers;
571                let (part, lease) = part_desc.into_exchangeable_part();
572                leases.borrow_mut().push_at(current_ts.clone(), lease);
573                descs_output.give(&session_cap, (worker_idx, part));
574            }
575
576            current_frontier.join_assign(&progress);
577            cap_set.downgrade(progress.iter());
578        }
579    });
580
581    (descs_stream, shutdown_button.press_on_drop())
582}
583
584pub(crate) fn shard_source_fetch<'inner, K, V, T, D, TInner>(
585    descs: StreamVec<'inner, TInner, (usize, ExchangeableBatchPart<T>)>,
586    name: &str,
587    client: impl Future<Output = PersistClient> + Send + 'static,
588    shard_id: ShardId,
589    key_schema: Arc<K::Schema>,
590    val_schema: Arc<V::Schema>,
591    is_transient: bool,
592    error_handler: ErrorHandler,
593) -> (
594    StreamVec<'inner, TInner, FetchedBlob<K, V, T, D>>,
595    StreamVec<'inner, TInner, Infallible>,
596    PressOnDropButton,
597)
598where
599    K: Debug + Codec,
600    V: Debug + Codec,
601    T: Timestamp + Lattice + Codec64 + Sync,
602    D: Monoid + Codec64 + Send + Sync,
603    TInner: Timestamp + Refines<T>,
604{
605    let mut builder =
606        AsyncOperatorBuilder::new(format!("shard_source_fetch({})", name), descs.scope());
607    let (fetched_output, fetched_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
608    let (completed_fetches_output, completed_fetches_stream) =
609        builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
610    let mut descs_input = builder.new_input_for_many(
611        descs,
612        Exchange::new(|&(i, _): &(usize, _)| u64::cast_from(i)),
613        [&fetched_output, &completed_fetches_output],
614    );
615    let name_owned = name.to_owned();
616
617    let shutdown_button = builder.build(move |_capabilities| async move {
618        let mut fetcher = mz_ore::task::spawn(|| format!("shard_source_fetch({})", name_owned), {
619            let diagnostics = Diagnostics {
620                shard_name: name_owned.clone(),
621                handle_purpose: format!("shard_source_fetch batch fetcher {}", name_owned),
622            };
623            async move {
624                client
625                    .await
626                    .create_batch_fetcher::<K, V, T, D>(
627                        shard_id,
628                        key_schema,
629                        val_schema,
630                        is_transient,
631                        diagnostics,
632                    )
633                    .await
634            }
635        })
636        .await
637        .expect("shard codecs should not change");
638
639        while let Some(event) = descs_input.next().await {
640            if let Event::Data([fetched_cap, _completed_fetches_cap], data) = event {
641                // `LeasedBatchPart`es cannot be dropped at this point w/o
642                // panicking, so swap them to an owned version.
643                for (_idx, part) in data {
644                    let fetched = fetcher
645                        .fetch_leased_part(part)
646                        .await
647                        .expect("shard_id should match across all workers");
648                    let fetched = match fetched {
649                        Ok(fetched) => fetched,
650                        Err(blob_key) => {
651                            // Ideally, readers should never encounter a missing blob. They place a seqno
652                            // hold as they consume their snapshot/listen, preventing any blobs they need
653                            // from being deleted by garbage collection, and all blob implementations are
654                            // linearizable so there should be no possibility of stale reads.
655                            //
656                            // However, it is possible for a lease to expire given a sustained period of
657                            // downtime, which could allow parts we expect to exist to be deleted...
658                            // at which point our best option is to request a restart.
659                            error_handler
660                                .report_and_stop(anyhow!(
661                                    "batch fetcher could not fetch batch part {}; lost lease?",
662                                    blob_key
663                                ))
664                                .await
665                        }
666                    };
667                    {
668                        // Do very fine-grained output activation/session
669                        // creation to ensure that we don't hold activated
670                        // outputs or sessions across await points, which
671                        // would prevent messages from being flushed from
672                        // the shared timely output buffer.
673                        fetched_output.give(&fetched_cap, fetched);
674                    }
675                }
676            }
677        }
678    });
679
680    (
681        fetched_stream,
682        completed_fetches_stream,
683        shutdown_button.press_on_drop(),
684    )
685}
686
687#[cfg(test)]
688mod tests {
689    use super::*;
690    use std::sync::Arc;
691
692    use mz_persist::location::SeqNo;
693    use timely::dataflow::operators::Leave;
694    use timely::dataflow::operators::Probe;
695    use timely::dataflow::operators::probe::Handle as ProbeHandle;
696    use timely::progress::Antichain;
697
698    use crate::operators::shard_source::shard_source;
699    use crate::{Diagnostics, ShardId};
700
701    #[mz_ore::test]
702    fn test_lease_manager() {
703        let lease = Lease::new(SeqNo::minimum());
704        let mut manager = LeaseManager::new();
705        for t in 0u64..10 {
706            manager.push_at(t, lease.clone());
707        }
708        assert_eq!(lease.count(), 11);
709        manager.advance_to(AntichainRef::new(&[5]));
710        assert_eq!(lease.count(), 6);
711        manager.advance_to(AntichainRef::new(&[3]));
712        assert_eq!(lease.count(), 6);
713        manager.advance_to(AntichainRef::new(&[9]));
714        assert_eq!(lease.count(), 2);
715        manager.advance_to(AntichainRef::new(&[10]));
716        assert_eq!(lease.count(), 1);
717    }
718
719    /// Verifies that a `shard_source` will downgrade it's output frontier to
720    /// the `since` of the shard when no explicit `as_of` is given. Even if
721    /// there is no data/no snapshot available in the
722    /// shard.
723    ///
724    /// NOTE: This test is weird: if everything is good it will pass. If we
725    /// break the assumption that we test this will time out and we will notice.
726    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
727    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
728    async fn test_shard_source_implicit_initial_as_of() {
729        let persist_client = PersistClient::new_for_tests().await;
730
731        let expected_frontier = 42;
732        let shard_id = ShardId::new();
733
734        initialize_shard(
735            &persist_client,
736            shard_id,
737            Antichain::from_elem(expected_frontier),
738        )
739        .await;
740
741        let res = timely::execute::execute_directly(move |worker| {
742            let until = Antichain::new();
743
744            let (probe, _token) = worker.dataflow::<u64, _, _>(|outer| {
745                let (stream, token) = outer.scoped::<u64, _, _>("hybrid", |scope| {
746                    let transformer = move |_, descs, _| (descs, vec![]);
747                    let (stream, tokens) = shard_source::<String, String, u64, u64, _, _, _>(
748                        outer,
749                        scope,
750                        "test_source",
751                        move || std::future::ready(persist_client.clone()),
752                        shard_id,
753                        None, // No explicit as_of!
754                        SnapshotMode::Include,
755                        until,
756                        Some(transformer),
757                        Arc::new(
758                            <std::string::String as mz_persist_types::Codec>::Schema::default(),
759                        ),
760                        Arc::new(
761                            <std::string::String as mz_persist_types::Codec>::Schema::default(),
762                        ),
763                        FilterResult::keep_all,
764                        false.then_some(|| unreachable!()),
765                        async {},
766                        ErrorHandler::Halt("test"),
767                    );
768                    (stream.leave(outer), tokens)
769                });
770
771                let probe = ProbeHandle::new();
772                let _stream = stream.probe_with(&probe);
773
774                (probe, token)
775            });
776
777            while probe.less_than(&expected_frontier) {
778                worker.step();
779            }
780
781            let mut probe_frontier = Antichain::new();
782            probe.with_frontier(|f| probe_frontier.extend(f.iter().cloned()));
783
784            probe_frontier
785        });
786
787        assert_eq!(res, Antichain::from_elem(expected_frontier));
788    }
789
790    /// Verifies that a `shard_source` will downgrade it's output frontier to
791    /// the given `as_of`. Even if there is no data/no snapshot available in the
792    /// shard.
793    ///
794    /// NOTE: This test is weird: if everything is good it will pass. If we
795    /// break the assumption that we test this will time out and we will notice.
796    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
797    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
798    async fn test_shard_source_explicit_initial_as_of() {
799        let persist_client = PersistClient::new_for_tests().await;
800
801        let expected_frontier = 42;
802        let shard_id = ShardId::new();
803
804        initialize_shard(
805            &persist_client,
806            shard_id,
807            Antichain::from_elem(expected_frontier),
808        )
809        .await;
810
811        let res = timely::execute::execute_directly(move |worker| {
812            let as_of = Antichain::from_elem(expected_frontier);
813            let until = Antichain::new();
814
815            let (probe, _token) = worker.dataflow::<u64, _, _>(|outer| {
816                let (stream, token) = outer.scoped::<u64, _, _>("hybrid", |scope| {
817                    let transformer = move |_, descs, _| (descs, vec![]);
818                    let (stream, tokens) = shard_source::<String, String, u64, u64, _, _, _>(
819                        outer,
820                        scope,
821                        "test_source",
822                        move || std::future::ready(persist_client.clone()),
823                        shard_id,
824                        Some(as_of), // We specify the as_of explicitly!
825                        SnapshotMode::Include,
826                        until,
827                        Some(transformer),
828                        Arc::new(
829                            <std::string::String as mz_persist_types::Codec>::Schema::default(),
830                        ),
831                        Arc::new(
832                            <std::string::String as mz_persist_types::Codec>::Schema::default(),
833                        ),
834                        FilterResult::keep_all,
835                        false.then_some(|| unreachable!()),
836                        async {},
837                        ErrorHandler::Halt("test"),
838                    );
839                    (stream.leave(outer), tokens)
840                });
841
842                let probe = ProbeHandle::new();
843                let _stream = stream.probe_with(&probe);
844
845                (probe, token)
846            });
847
848            while probe.less_than(&expected_frontier) {
849                worker.step();
850            }
851
852            let mut probe_frontier = Antichain::new();
853            probe.with_frontier(|f| probe_frontier.extend(f.iter().cloned()));
854
855            probe_frontier
856        });
857
858        assert_eq!(res, Antichain::from_elem(expected_frontier));
859    }
860
861    async fn initialize_shard(
862        persist_client: &PersistClient,
863        shard_id: ShardId,
864        since: Antichain<u64>,
865    ) {
866        let mut read_handle = persist_client
867            .open_leased_reader::<String, String, u64, u64>(
868                shard_id,
869                Arc::new(<std::string::String as mz_persist_types::Codec>::Schema::default()),
870                Arc::new(<std::string::String as mz_persist_types::Codec>::Schema::default()),
871                Diagnostics::for_tests(),
872                true,
873            )
874            .await
875            .expect("invalid usage");
876
877        read_handle.downgrade_since(&since).await;
878    }
879}