fn handle_message<S: SourceReader>(
    message: SourceMessage<S::Key, S::Value, S::Diff>,
    bytes_read: &mut usize,
    cap_set: &CapabilitySet<Timestamp>,
    output: &mut OutputHandle<'_, Timestamp, Result<SourceOutput<S::Key, S::Value, S::Diff>, SourceError>, Tee<Timestamp, Result<SourceOutput<S::Key, S::Value, S::Diff>, SourceError>>>,
    metric_updates: &mut HashMap<PartitionId, (MzOffset, Timestamp, i64)>,
    ts: Timestamp
)
Expand description

Take message and assign it the appropriate timestamps and push it into the dataflow layer, if possible.

TODO: This function is a bit of a mess rn but hopefully this function makes the existing mess more obvious and points towards ways to improve it.