1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! A tokio task (and support machinery) for producing storage statistics.

use std::any::Any;
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use differential_dataflow::lattice::Lattice;
use itertools::Itertools;
use mz_ore::now::EpochMillis;
use mz_persist_types::Codec64;
use mz_repr::TimestampManipulation;
use mz_repr::{GlobalId, Row};
use timely::progress::ChangeBatch;
use timely::progress::Timestamp;
use tokio::sync::oneshot;

use crate::client::PackableStats;
use crate::controller::collection_mgmt::CollectionManager;

/// An enum that tracks the lifecycle of statistics objects
/// in the controller.
///
/// When sources/sinks are dropped, some state (including statistics) are
/// cleaned up later, as part of processing the storage controller. This
/// means that a new statistics update could happen between `DROP SOURCE`
/// and cleanup, which is we need to distinguished `Uninitialized` from
/// "explicitly removed" so that we no longer consolidate stats for an
/// item into the `shared_stats` shared map.
#[derive(Debug)]
pub(super) struct StatsInitState<T>(pub BTreeMap<usize, T>);

impl<T> StatsInitState<T> {
    /// Set the value for the given id, overriding it if it already exists,
    /// and doing nothing if its been removed.
    pub(super) fn set_if_not_removed(this: Option<&mut Self>, worker_id: usize, val: T) {
        match this {
            Some(StatsInitState(map)) => {
                map.insert(worker_id, val);
            }
            None => {}
        }
    }
}

/// Spawns a task that continually (at an interval) writes statistics from storaged's
/// that are consolidated in shared memory in the controller.
pub(super) fn spawn_statistics_scraper<Stats, T>(
    statistics_collection_id: GlobalId,
    collection_mgmt: CollectionManager<T>,
    shared_stats: Arc<Mutex<BTreeMap<GlobalId, StatsInitState<Stats>>>>,
) -> Box<dyn Any + Send + Sync>
where
    Stats: PackableStats + Debug + Send + 'static,
    T: Timestamp + Lattice + Codec64 + From<EpochMillis> + TimestampManipulation,
{
    // TODO(guswynn): Should this be configurable? Maybe via LaunchDarkly?
    const STATISTICS_INTERVAL: Duration = Duration::from_secs(30);

    let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();

    mz_ore::task::spawn(|| "statistics_scraper", async move {
        // Keep track of what we think is the contents of the output
        // collection, so that we can emit the required retractions/updates
        // when we learn about new metrics.
        //
        // We assume that `shared_stats` is kept up-to-date by the controller.
        let mut current_metrics = ChangeBatch::new();

        let mut interval = tokio::time::interval(STATISTICS_INTERVAL);
        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

        loop {
            tokio::select! {
                _msg = &mut shutdown_rx => {
                    break;
                }

                _ = interval.tick() => {
                    let mut row_buf = Row::default();
                    let mut correction = current_metrics
                        .iter()
                        .cloned()
                        .map(|(row, diff)| (row, -diff))
                        .collect_vec();

                    // Ideally we move quickly when holding the lock here, as it can hold
                    // up the coordinator. Because we are just moving some data around, we should
                    // be fine!
                    {
                        let shared_stats = shared_stats.lock().expect("poisoned");
                        for (_, stats) in shared_stats.iter() {
                            for stat in stats.0.values() {
                                stat.pack(row_buf.packer());
                                correction.push((row_buf.clone(), 1));
                            }
                        }
                    }

                    // Update our view of the output collection and write updates
                    // out to the collection.
                    if !correction.is_empty() {
                        current_metrics.extend(correction.iter().cloned());
                        collection_mgmt
                            .append_to_collection(statistics_collection_id, correction)
                            .await;
                    }
                }
            }
        }

        tracing::info!("shutting down statistics sender task");
    });

    Box::new(shutdown_tx)
}