1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.

//! Types for cluster-internal control messages that can be broadcast to all
//! workers from individual operators/workers.

use std::collections::BTreeMap;
use std::time::Instant;

use mz_repr::{GlobalId, Row};
use mz_rocksdb::config::SharedWriteBufferManager;
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::parameters::StorageParameters;
use mz_storage_types::sinks::{MetadataFilled, StorageSinkDesc};
use mz_storage_types::sources::IngestionDescription;
use serde::{Deserialize, Serialize};
use timely::communication::Allocate;
use timely::progress::Antichain;
use timely::synchronization::Sequencer;
use timely::worker::Worker as TimelyWorker;

use crate::statistics::{SinkStatisticsRecord, SourceStatisticsRecord};

/// _Dynamic_ storage instance configuration parameters that are used during dataflow rendering.
/// Changes to these parameters are applied to `StorageWorker`s in a consistent order
/// with source and sink creation.
#[derive(Debug)]
pub struct DataflowParameters {
    /// Configuration/tuning for RocksDB. This also contains
    /// some shared objects, which is why its separate.
    pub upsert_rocksdb_tuning_config: mz_rocksdb::RocksDBConfig,
}

impl DataflowParameters {
    /// Creates a new instance of `DataflowParameters` with given shared rocksdb write buffer manager
    /// and the cluster memory limit
    pub fn new(
        shared_rocksdb_write_buffer_manager: SharedWriteBufferManager,
        cluster_memory_limit: Option<usize>,
    ) -> Self {
        Self {
            upsert_rocksdb_tuning_config: mz_rocksdb::RocksDBConfig::new(
                shared_rocksdb_write_buffer_manager,
                cluster_memory_limit,
            ),
        }
    }
    /// Update the `DataflowParameters` with new configuration.
    pub fn update(&mut self, storage_parameters: StorageParameters) {
        self.upsert_rocksdb_tuning_config
            .apply(storage_parameters.upsert_rocksdb_tuning_config.clone());
    }
}

/// Internal commands that can be sent by individual operators/workers that will
/// be broadcast to all workers. The worker main loop will receive those and act
/// on them.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub enum InternalStorageCommand {
    /// Suspend and restart the dataflow identified by the `GlobalId`.
    SuspendAndRestart {
        /// The id of the dataflow that should be restarted.
        id: GlobalId,
        /// The reason for the restart request.
        reason: String,
    },
    /// Render an ingestion dataflow at the given resumption frontier.
    CreateIngestionDataflow {
        /// ID of the ingestion/sourve.
        id: GlobalId,
        /// The description of the ingestion/source.
        ingestion_description: IngestionDescription<CollectionMetadata>,
        /// The frontier beyond which ingested updates should be uncompacted. Inputs to the
        /// ingestion are guaranteed to be readable at this frontier.
        as_of: Antichain<mz_repr::Timestamp>,
        /// A frontier in the Materialize time domain with the property that all updates not beyond
        /// it have already been durably ingested.
        resume_uppers: BTreeMap<GlobalId, Antichain<mz_repr::Timestamp>>,
        /// A frontier in the source time domain with the property that all updates not beyond it
        /// have already been durably ingested.
        source_resume_uppers: BTreeMap<GlobalId, Vec<Row>>,
    },
    /// Render a sink dataflow.
    RunSinkDataflow(
        GlobalId,
        StorageSinkDesc<MetadataFilled, mz_repr::Timestamp>,
    ),
    /// Drop all state and operators for a dataflow. This is a vec because some
    /// dataflows have their state spread over multiple IDs (i.e. sources that
    /// spawn subsources); this means that actions taken in response to this
    /// command should be permissive about missing state.
    DropDataflow(Vec<GlobalId>),

    /// Update the configuration for rendering dataflows.
    UpdateConfiguration {
        /// The new configuration parameters.
        storage_parameters: StorageParameters,
    },
    /// For moving statistics updates to worker 0.
    StatisticsUpdate {
        /// Local statistics, with their epochs.
        sources: Vec<(usize, SourceStatisticsRecord)>,
        /// Local statistics, with their epochs.
        sinks: Vec<(usize, SinkStatisticsRecord)>,
    },
}

/// Allows broadcasting [`internal commands`](InternalStorageCommand) to all
/// workers.
pub trait InternalCommandSender {
    /// Broadcasts the given command to all workers.
    fn broadcast(&mut self, internal_cmd: InternalStorageCommand);

    /// Returns the next available command, if any. This returns `None` when
    /// there are currently no commands but there might be commands again in the
    /// future.
    fn next(&mut self) -> Option<InternalStorageCommand>;
}

impl InternalCommandSender for Sequencer<InternalStorageCommand> {
    fn broadcast(&mut self, internal_cmd: InternalStorageCommand) {
        self.push(internal_cmd);
    }

    fn next(&mut self) -> Option<InternalStorageCommand> {
        Iterator::next(self)
    }
}

pub(crate) fn setup_command_sequencer<'w, A: Allocate>(
    timely_worker: &'w mut TimelyWorker<A>,
) -> Sequencer<InternalStorageCommand> {
    // TODO(aljoscha): Use something based on `mz_ore::NowFn`?
    Sequencer::new(timely_worker, Instant::now())
}