pub(crate) fn shard_source_fetch<K, V, T, D, G>(
descs: &Stream<G, (usize, SerdeLeasedBatchPart)>,
name: &str,
client: impl Future<Output = PersistClient> + Send + 'static,
shard_id: ShardId,
key_schema: Arc<K::Schema>,
val_schema: Arc<V::Schema>,
is_transient: bool,
) -> (Stream<G, FetchedBlob<K, V, T, D>>, Stream<G, Infallible>, PressOnDropButton)