1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! A client that maintains summaries of the involved objects.

use crate::client::{Client, Command, ComputeCommand, ComputeResponse, Response, StorageCommand};
use expr::GlobalId;
use repr::Timestamp;
use timely::progress::{frontier::AntichainRef, Antichain, ChangeBatch};

/// A client that maintains soft state and validates commands, in addition to forwarding them.
pub struct Controller<C> {
    /// The underlying client,
    client: C,
    /// Sources that have been created.
    ///
    /// A `None` variant means that the source was dropped before it was first created.
    source_descriptions: std::collections::BTreeMap<GlobalId, Option<crate::sources::SourceDesc>>,
    /// Tracks `since` and `upper` frontiers for indexes and sinks.
    compute_since_uppers: SinceUpperMap,
    /// Tracks `since` and `upper` frontiers for sources and tables.
    storage_since_uppers: SinceUpperMap,
}

#[async_trait::async_trait]
impl<C: Client> Client for Controller<C> {
    async fn send(&mut self, cmd: Command) {
        self.send(cmd).await
    }
    async fn recv(&mut self) -> Option<Response> {
        self.recv().await
    }
}

impl<C: Client> Controller<C> {
    pub async fn send(&mut self, cmd: Command) {
        match &cmd {
            Command::Storage(StorageCommand::CreateSources(bindings)) => {
                // Maintain the list of bindings, and complain if one attempts to either
                // 1. create a dropped source identifier, or
                // 2. create an existing source identifier with a new description.
                for (id, (description, since)) in bindings.iter() {
                    let description = Some(description.clone());
                    match self.source_descriptions.get(&id) {
                        Some(None) => {
                            panic!("Attempt to recreate dropped source id: {:?}", id);
                        }
                        Some(prior_description) => {
                            if prior_description != &description {
                                panic!(
                                    "Multiple distinct descriptions created for source id {}: {:?} and {:?}",
                                    id,
                                    prior_description.as_ref().unwrap(),
                                    description.as_ref().unwrap(),
                                );
                            }
                        }
                        None => {
                            // All is well; no reason to panic.
                        }
                    }

                    self.source_descriptions.insert(*id, description.clone());
                    // We start tracking `upper` at 0; correct this should that change (e.g. to `as_of`).
                    self.storage_since_uppers
                        .insert(*id, (since.clone(), Antichain::from_elem(0)));
                }
            }
            Command::Storage(StorageCommand::DropSources(identifiers)) => {
                for id in identifiers.iter() {
                    if !self.source_descriptions.contains_key(id) {
                        tracing::error!("Source id {} dropped without first being created", id);
                    } else {
                        self.source_descriptions.insert(*id, None);
                    }
                }
            }
            Command::Compute(ComputeCommand::EnableLogging(logging_config), _instance) => {
                for id in logging_config.log_identifiers() {
                    self.compute_since_uppers
                        .insert(id, (Antichain::from_elem(0), Antichain::from_elem(0)));
                }
            }
            Command::Compute(ComputeCommand::CreateDataflows(dataflows), _instance) => {
                // Validate dataflows as having inputs whose `since` is less or equal to the dataflow's `as_of`.
                // Start tracking frontiers for each dataflow, using its `as_of` for each index and sink.
                for dataflow in dataflows.iter() {
                    let as_of = dataflow
                        .as_of
                        .as_ref()
                        .expect("Dataflow constructed without as_of set");

                    // Validate sources have `since.less_equal(as_of)`.
                    // TODO(mcsherry): Instead, return an error from the constructing method.
                    for (source_id, _) in dataflow.source_imports.iter() {
                        let (since, _upper) = self
                            .source_since_upper_for(*source_id)
                            .expect("Source frontiers absent in dataflow construction");
                        assert!(<_ as timely::order::PartialOrder>::less_equal(
                            &since,
                            &as_of.borrow()
                        ));
                    }

                    // Validate indexes have `since.less_equal(as_of)`.
                    // TODO(mcsherry): Instead, return an error from the constructing method.
                    for (index_id, _) in dataflow.index_imports.iter() {
                        let (since, _upper) = self
                            .index_since_upper_for(*index_id)
                            .expect("Index frontiers absent in dataflow construction");
                        assert!(<_ as timely::order::PartialOrder>::less_equal(
                            &since,
                            &as_of.borrow()
                        ));
                    }

                    for (sink_id, _) in dataflow.sink_exports.iter() {
                        // We start tracking `upper` at 0; correct this should that change (e.g. to `as_of`).
                        self.compute_since_uppers
                            .insert(*sink_id, (as_of.clone(), Antichain::from_elem(0)));
                    }
                    for (index_id, _, _) in dataflow.index_exports.iter() {
                        // We start tracking `upper` at 0; correct this should that change (e.g. to `as_of`).
                        self.compute_since_uppers
                            .insert(*index_id, (as_of.clone(), Antichain::from_elem(0)));
                    }
                }
            }
            Command::Compute(ComputeCommand::AllowIndexCompaction(frontiers), _instance) => {
                for (id, frontier) in frontiers.iter() {
                    self.compute_since_uppers.advance_since_for(*id, frontier);
                }
            }
            Command::Storage(StorageCommand::AllowSourceCompaction(frontiers)) => {
                for (id, frontier) in frontiers.iter() {
                    self.storage_since_uppers.advance_since_for(*id, frontier);
                }
            }

            _ => {}
        }
        self.client.send(cmd).await
    }

    pub async fn recv(&mut self) -> Option<Response> {
        let response = self.client.recv().await;

        if let Some(response) = response.as_ref() {
            match response {
                Response::Compute(ComputeResponse::FrontierUppers(updates)) => {
                    for (id, changes) in updates.iter() {
                        self.compute_since_uppers.update_upper_for(*id, changes);
                    }
                }
                _ => {}
            }
        }

        response
    }
}

impl<C> Controller<C> {
    /// Create a new controller from a client it should wrap.
    pub fn new(client: C) -> Self {
        Self {
            client,
            source_descriptions: Default::default(),
            compute_since_uppers: Default::default(),
            storage_since_uppers: Default::default(),
        }
    }
    /// Returns the source description for a given identifier.
    ///
    /// The response does not distinguish between an as yet uncreated source description,
    /// and one that has been created and then dropped (or dropped without creation).
    /// There is a distinction and the client is aware of it, and could plausibly return
    /// this information if we had a use for it.
    pub fn source_description_for(&self, id: GlobalId) -> Option<&crate::sources::SourceDesc> {
        self.source_descriptions.get(&id).unwrap_or(&None).as_ref()
    }

    /// Returns the pair of `since` and `upper` for a maintained index, if it exists.
    ///
    /// The `since` frontier indicates that the maintained data are certainly valid for times greater
    /// or equal to the frontier, but they may not be for other times. Attempting to create a dataflow
    /// using this `id` with an `as_of` that is not at least `since` will result in an error.
    ///
    /// The `upper` frontier indicates that the data are reported available for all times not greater
    /// or equal to the frontier. Dataflows with an `as_of` greater or equal to this frontier may not
    /// immediately produce results.
    pub fn index_since_upper_for(
        &self,
        id: GlobalId,
    ) -> Option<(AntichainRef<Timestamp>, AntichainRef<Timestamp>)> {
        self.compute_since_uppers.get(&id)
    }

    /// Returns the pair of `since` and `upper` for a source, if it exists.
    ///
    /// The `since` frontier indicates that the maintained data are certainly valid for times greater
    /// or equal to the frontier, but they may not be for other times. Attempting to create a dataflow
    /// using this `id` with an `as_of` that is not at least `since` will result in an error.
    ///
    /// The `upper` frontier indicates that the data are reported available for all times not greater
    /// or equal to the frontier. Dataflows with an `as_of` greater or equal to this frontier may not
    /// immediately produce results.
    pub fn source_since_upper_for(
        &self,
        id: GlobalId,
    ) -> Option<(AntichainRef<Timestamp>, AntichainRef<Timestamp>)> {
        self.storage_since_uppers.get(&id)
    }
}

#[derive(Default)]
struct SinceUpperMap {
    since_uppers:
        std::collections::BTreeMap<GlobalId, (Antichain<Timestamp>, Antichain<Timestamp>)>,
}

impl SinceUpperMap {
    fn insert(&mut self, id: GlobalId, since_upper: (Antichain<Timestamp>, Antichain<Timestamp>)) {
        self.since_uppers.insert(id, since_upper);
    }
    fn get(&self, id: &GlobalId) -> Option<(AntichainRef<Timestamp>, AntichainRef<Timestamp>)> {
        self.since_uppers
            .get(id)
            .map(|(since, upper)| (since.borrow(), upper.borrow()))
    }
    fn advance_since_for(&mut self, id: GlobalId, frontier: &Antichain<Timestamp>) {
        if let Some((since, _upper)) = self.since_uppers.get_mut(&id) {
            use differential_dataflow::lattice::Lattice;
            since.join_assign(frontier);
        } else {
            // If we allow compaction before the item is created, pre-restrict the valid range.
            // We start tracking `upper` at 0; correct this should that change (e.g. to `as_of`).
            self.since_uppers
                .insert(id, (frontier.clone(), Antichain::from_elem(0)));
        }
    }
    fn update_upper_for(&mut self, id: GlobalId, changes: &ChangeBatch<Timestamp>) {
        if let Some((_since, upper)) = self.since_uppers.get_mut(&id) {
            // Apply `changes` to `upper`.
            let mut changes = changes.clone();
            for time in upper.elements().iter() {
                changes.update(time.clone(), 1);
            }
            upper.clear();
            for (time, count) in changes.drain() {
                assert_eq!(count, 1);
                upper.insert(time);
            }
        } else {
            // No panic, as we could have recently dropped this.
            // If we can tell these are updates to an id that could still be constructed,
            // something is weird and we should error.
        }
    }
}