fn subscribe<G>(
    sinked_collection: Collection<G, Row, Diff>,
    err_collection: Collection<G, DataflowError, Diff>,
    sink_id: GlobalId,
    with_snapshot: bool,
    as_of: Antichain<G::Timestamp>,
    up_to: Antichain<G::Timestamp>,
    subscribe_protocol_handle: Rc<RefCell<Option<SubscribeProtocol>>>
)where
    G: Scope<Timestamp = Timestamp>,