1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use futures::StreamExt;
use mz_repr::GlobalId;
use mz_storage_types::sources::SourceTimestamp;
use mz_timely_util::builder_async::{Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder};
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::{Scope, Stream};

use crate::source::types::ProgressStatisticsUpdate;
use crate::statistics::SourceStatistics;

pub fn process_statistics<G, FromTime>(
    scope: G,
    source_id: GlobalId,
    worker_id: usize,
    stats_stream: Stream<G, ProgressStatisticsUpdate>,
    source_statistics: SourceStatistics,
) where
    G: Scope<Timestamp = FromTime>,
    FromTime: SourceTimestamp,
{
    let name = format!("SourceProgressStats({})", source_id);
    let mut builder = AsyncOperatorBuilder::new(name, scope.clone());

    let mut input = builder.new_disconnected_input(&stats_stream, Pipeline);

    builder.build(move |caps| async move {
        drop(caps);

        while let Some(event) = input.next().await {
            let AsyncEvent::Data(_, data) = event else {
                continue;
            };
            tracing::debug!(
                ?data,
                %source_id,
                %worker_id,
                "timely-{worker_id} received \
                    source progress statistics update"
            );

            for d in data {
                match d {
                    ProgressStatisticsUpdate::Snapshot {
                        records_known,
                        records_staged,
                    } => {
                        source_statistics.set_snapshot_records_known(records_known);
                        source_statistics.set_snapshot_records_staged(records_staged);
                    }
                    ProgressStatisticsUpdate::SteadyState {
                        mut offset_known,
                        offset_committed,
                    } => {
                        // There are two reasons `offset_known` could be below
                        // `offset_committed`:
                        // - A source implementation only periodically fetches `offset_known`,
                        // but drives offset_committed based on the data its received. This is
                        // the case for sources like Kafka.
                        // - Some irrecoverable restore has regressed `offset_known`. This is
                        // possible for sources like Postgres, and the these metrics are NOT
                        // the intended signal for this failure (source status is).
                        offset_known = std::cmp::max(offset_known, offset_committed);

                        source_statistics.set_offset_known(offset_known);
                        source_statistics.set_offset_committed(offset_committed);
                    }
                }
            }
        }
    });
}