1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! A client that maintains summaries of the involved objects.
use crate::client::{Client, Command, ComputeCommand, ComputeResponse, Response, StorageCommand};
use expr::GlobalId;
use repr::Timestamp;
use timely::progress::{frontier::AntichainRef, Antichain, ChangeBatch};
/// A client that maintains soft state and validates commands, in addition to forwarding them.
pub struct Controller<C> {
/// The underlying client,
client: C,
/// Sources that have been created.
///
/// A `None` variant means that the source was dropped before it was first created.
source_descriptions: std::collections::BTreeMap<GlobalId, Option<crate::sources::SourceDesc>>,
/// Tracks `since` and `upper` frontiers for indexes and sinks.
compute_since_uppers: SinceUpperMap,
/// Tracks `since` and `upper` frontiers for sources and tables.
storage_since_uppers: SinceUpperMap,
}
#[async_trait::async_trait]
impl<C: Client> Client for Controller<C> {
async fn send(&mut self, cmd: Command) {
self.send(cmd).await
}
async fn recv(&mut self) -> Option<Response> {
self.recv().await
}
}
impl<C: Client> Controller<C> {
pub async fn send(&mut self, cmd: Command) {
match &cmd {
Command::Storage(StorageCommand::CreateSources(bindings)) => {
// Maintain the list of bindings, and complain if one attempts to either
// 1. create a dropped source identifier, or
// 2. create an existing source identifier with a new description.
for (id, (description, since)) in bindings.iter() {
let description = Some(description.clone());
match self.source_descriptions.get(&id) {
Some(None) => {
panic!("Attempt to recreate dropped source id: {:?}", id);
}
Some(prior_description) => {
if prior_description != &description {
panic!(
"Multiple distinct descriptions created for source id {}: {:?} and {:?}",
id,
prior_description.as_ref().unwrap(),
description.as_ref().unwrap(),
);
}
}
None => {
// All is well; no reason to panic.
}
}
self.source_descriptions.insert(*id, description.clone());
// We start tracking `upper` at 0; correct this should that change (e.g. to `as_of`).
self.storage_since_uppers
.insert(*id, (since.clone(), Antichain::from_elem(0)));
}
}
Command::Storage(StorageCommand::DropSources(identifiers)) => {
for id in identifiers.iter() {
if !self.source_descriptions.contains_key(id) {
tracing::error!("Source id {} dropped without first being created", id);
} else {
self.source_descriptions.insert(*id, None);
}
}
}
Command::Compute(ComputeCommand::EnableLogging(logging_config), _instance) => {
for id in logging_config.log_identifiers() {
self.compute_since_uppers
.insert(id, (Antichain::from_elem(0), Antichain::from_elem(0)));
}
}
Command::Compute(ComputeCommand::CreateDataflows(dataflows), _instance) => {
// Validate dataflows as having inputs whose `since` is less or equal to the dataflow's `as_of`.
// Start tracking frontiers for each dataflow, using its `as_of` for each index and sink.
for dataflow in dataflows.iter() {
let as_of = dataflow
.as_of
.as_ref()
.expect("Dataflow constructed without as_of set");
// Validate sources have `since.less_equal(as_of)`.
// TODO(mcsherry): Instead, return an error from the constructing method.
for (source_id, _) in dataflow.source_imports.iter() {
let (since, _upper) = self
.source_since_upper_for(*source_id)
.expect("Source frontiers absent in dataflow construction");
assert!(<_ as timely::order::PartialOrder>::less_equal(
&since,
&as_of.borrow()
));
}
// Validate indexes have `since.less_equal(as_of)`.
// TODO(mcsherry): Instead, return an error from the constructing method.
for (index_id, _) in dataflow.index_imports.iter() {
let (since, _upper) = self
.index_since_upper_for(*index_id)
.expect("Index frontiers absent in dataflow construction");
assert!(<_ as timely::order::PartialOrder>::less_equal(
&since,
&as_of.borrow()
));
}
for (sink_id, _) in dataflow.sink_exports.iter() {
// We start tracking `upper` at 0; correct this should that change (e.g. to `as_of`).
self.compute_since_uppers
.insert(*sink_id, (as_of.clone(), Antichain::from_elem(0)));
}
for (index_id, _, _) in dataflow.index_exports.iter() {
// We start tracking `upper` at 0; correct this should that change (e.g. to `as_of`).
self.compute_since_uppers
.insert(*index_id, (as_of.clone(), Antichain::from_elem(0)));
}
}
}
Command::Compute(ComputeCommand::AllowIndexCompaction(frontiers), _instance) => {
for (id, frontier) in frontiers.iter() {
self.compute_since_uppers.advance_since_for(*id, frontier);
}
}
Command::Storage(StorageCommand::AllowSourceCompaction(frontiers)) => {
for (id, frontier) in frontiers.iter() {
self.storage_since_uppers.advance_since_for(*id, frontier);
}
}
_ => {}
}
self.client.send(cmd).await
}
pub async fn recv(&mut self) -> Option<Response> {
let response = self.client.recv().await;
if let Some(response) = response.as_ref() {
match response {
Response::Compute(ComputeResponse::FrontierUppers(updates)) => {
for (id, changes) in updates.iter() {
self.compute_since_uppers.update_upper_for(*id, changes);
}
}
_ => {}
}
}
response
}
}
impl<C> Controller<C> {
/// Create a new controller from a client it should wrap.
pub fn new(client: C) -> Self {
Self {
client,
source_descriptions: Default::default(),
compute_since_uppers: Default::default(),
storage_since_uppers: Default::default(),
}
}
/// Returns the source description for a given identifier.
///
/// The response does not distinguish between an as yet uncreated source description,
/// and one that has been created and then dropped (or dropped without creation).
/// There is a distinction and the client is aware of it, and could plausibly return
/// this information if we had a use for it.
pub fn source_description_for(&self, id: GlobalId) -> Option<&crate::sources::SourceDesc> {
self.source_descriptions.get(&id).unwrap_or(&None).as_ref()
}
/// Returns the pair of `since` and `upper` for a maintained index, if it exists.
///
/// The `since` frontier indicates that the maintained data are certainly valid for times greater
/// or equal to the frontier, but they may not be for other times. Attempting to create a dataflow
/// using this `id` with an `as_of` that is not at least `since` will result in an error.
///
/// The `upper` frontier indicates that the data are reported available for all times not greater
/// or equal to the frontier. Dataflows with an `as_of` greater or equal to this frontier may not
/// immediately produce results.
pub fn index_since_upper_for(
&self,
id: GlobalId,
) -> Option<(AntichainRef<Timestamp>, AntichainRef<Timestamp>)> {
self.compute_since_uppers.get(&id)
}
/// Returns the pair of `since` and `upper` for a source, if it exists.
///
/// The `since` frontier indicates that the maintained data are certainly valid for times greater
/// or equal to the frontier, but they may not be for other times. Attempting to create a dataflow
/// using this `id` with an `as_of` that is not at least `since` will result in an error.
///
/// The `upper` frontier indicates that the data are reported available for all times not greater
/// or equal to the frontier. Dataflows with an `as_of` greater or equal to this frontier may not
/// immediately produce results.
pub fn source_since_upper_for(
&self,
id: GlobalId,
) -> Option<(AntichainRef<Timestamp>, AntichainRef<Timestamp>)> {
self.storage_since_uppers.get(&id)
}
}
#[derive(Default)]
struct SinceUpperMap {
since_uppers:
std::collections::BTreeMap<GlobalId, (Antichain<Timestamp>, Antichain<Timestamp>)>,
}
impl SinceUpperMap {
fn insert(&mut self, id: GlobalId, since_upper: (Antichain<Timestamp>, Antichain<Timestamp>)) {
self.since_uppers.insert(id, since_upper);
}
fn get(&self, id: &GlobalId) -> Option<(AntichainRef<Timestamp>, AntichainRef<Timestamp>)> {
self.since_uppers
.get(id)
.map(|(since, upper)| (since.borrow(), upper.borrow()))
}
fn advance_since_for(&mut self, id: GlobalId, frontier: &Antichain<Timestamp>) {
if let Some((since, _upper)) = self.since_uppers.get_mut(&id) {
use differential_dataflow::lattice::Lattice;
since.join_assign(frontier);
} else {
// If we allow compaction before the item is created, pre-restrict the valid range.
// We start tracking `upper` at 0; correct this should that change (e.g. to `as_of`).
self.since_uppers
.insert(id, (frontier.clone(), Antichain::from_elem(0)));
}
}
fn update_upper_for(&mut self, id: GlobalId, changes: &ChangeBatch<Timestamp>) {
if let Some((_since, upper)) = self.since_uppers.get_mut(&id) {
// Apply `changes` to `upper`.
let mut changes = changes.clone();
for time in upper.elements().iter() {
changes.update(time.clone(), 1);
}
upper.clear();
for (time, count) in changes.drain() {
assert_eq!(count, 1);
upper.insert(time);
}
} else {
// No panic, as we could have recently dropped this.
// If we can tell these are updates to an id that could still be constructed,
// something is weird and we should error.
}
}
}