1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! A tokio task (and support machinery) for producing storage statistics.

use std::any::Any;
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use itertools::Itertools;
use mz_repr::{GlobalId, Row};
use timely::progress::ChangeBatch;
use tokio::sync::oneshot;

use crate::client::PackableStats;
use crate::controller::collection_mgmt::CollectionManager;

/// Spawns a task that continually (at an interval) writes statistics from storaged's
/// that are consolidated in shared memory in the controller.
pub(super) fn spawn_statistics_scraper<Stats: PackableStats + Send + 'static>(
    statistics_collection_id: GlobalId,
    collection_mgmt: CollectionManager,
    shared_stats: Arc<Mutex<BTreeMap<GlobalId, BTreeMap<usize, Stats>>>>,
) -> Box<dyn Any + Send + Sync> {
    // TODO(guswynn): Should this be configurable? Maybe via LaunchDarkly?
    const STATISTICS_INTERVAL: Duration = Duration::from_secs(30);

    let (shutdown_tx, mut shutdown_rx) = oneshot::channel::<()>();

    mz_ore::task::spawn(|| "statistics_scraper", async move {
        // Keep track of what we think is the contents of the output
        // collection, so that we can emit the required retractions/updates
        // when we learn about new metrics.
        //
        // We assume that `shared_stats` is kept up-to-date by the controller.
        let mut current_metrics = ChangeBatch::new();

        let mut interval = tokio::time::interval(STATISTICS_INTERVAL);
        interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

        loop {
            tokio::select! {
                _msg = &mut shutdown_rx => {
                    break;
                }

                _ = interval.tick() => {
                    let mut row_buf = Row::default();
                    let mut correction = current_metrics
                        .iter()
                        .cloned()
                        .map(|(row, diff)| (row, -diff))
                        .collect_vec();

                    // Ideally we move quickly when holding the lock here, as it can hold
                    // up the coordinator. Because we are just moving some data around, we should
                    // be fine!
                    //
                    // TODO: consider using a RwLock instead of a mutex.
                    {
                        let shared_stats = shared_stats.lock().expect("poisoned");

                        for (_, items) in shared_stats.iter() {
                            for (_, stats) in items.iter() {
                                stats.pack(row_buf.packer());
                                correction.push((row_buf.clone(), 1));
                            }
                        }
                    }

                    // Update our view of the output collection and write updates
                    // out to the collection.
                    if !correction.is_empty() {
                        current_metrics.extend(correction.iter().cloned());
                        collection_mgmt
                            .append_to_collection(statistics_collection_id, correction)
                            .await;
                    }
                }
            }
        }

        tracing::info!("shutting down statistics sender task");
    });

    Box::new(shutdown_tx)
}