pub fn decode_and_mfp<G, YFn>(
    fetched: &Stream<G, FetchedPart<SourceData, (), Timestamp, Diff>>,
    name: &str,
    until: Antichain<Timestamp>,
    map_filter_project: Option<&mut MfpPlan>,
    yield_fn: YFn
) -> Stream<G, (Result<Row, DataflowError>, G::Timestamp, Diff)>where
    G: Scope<Timestamp = (Timestamp, u64)>,
    YFn: Fn(Instant, usize) -> bool + 'static,