pub(crate) fn shard_source_descs<K, V, D, F, G>(
scope: &G,
name: &str,
client: impl Future<Output = PersistClient> + Send + 'static,
shard_id: ShardId,
as_of: Option<Antichain<G::Timestamp>>,
snapshot_mode: SnapshotMode,
until: Antichain<G::Timestamp>,
completed_fetches_stream: Stream<G, Infallible>,
chosen_worker: usize,
key_schema: Arc<K::Schema>,
val_schema: Arc<V::Schema>,
should_fetch_part: F,
listen_sleep: Option<impl Fn() -> RetryParameters + 'static>,
start_signal: impl Future<Output = ()> + 'static,
error_handler: impl FnOnce(String) -> Pin<Box<dyn Future<Output = ()>>> + 'static,
project: ProjectionPushdown,
) -> (Stream<G, (usize, SerdeLeasedBatchPart)>, PressOnDropButton)