fn fetch_end_offset<C>(
    consumer: &BaseConsumer<C>,
    topic: &str,
    pid: i32
) -> Result<i64, PlanError>where
    C: ConsumerContext,