mz_compute/
command_channel.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A channel for broadcasting compute commands from worker 0 to other workers.
11//!
12//! Compute uses a dataflow to distribute commands between workers. This is to ensure workers
13//! retain a consistent dataflow state across reconnects. If each worker would receive its commands
14//! directly from the controller, there wouldn't be any guarantee that after a reconnect all
15//! workers have seen the same sequence of commands. This is particularly problematic for
16//! `CreateDataflow` commands, since Timely requires that all workers render the same dataflows in
17//! the same order. So the controller instead sends commands only to worker 0, which then
18//! broadcasts them to other workers through the Timely fabric, taking care of the correct
19//! sequencing.
20//!
21//! Commands in the command channel are tagged with a nonce identifying the incarnation of the
22//! compute protocol the command belongs to, allowing workers to recognize client reconnects that
23//! require a reconciliation.
24
25use std::sync::{Arc, Mutex};
26
27use crossbeam_channel::TryRecvError;
28use itertools::Itertools;
29use mz_compute_client::protocol::command::ComputeCommand;
30use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
31use mz_ore::cast::CastFrom;
32use timely::communication::Allocate;
33use timely::dataflow::channels::pact::Exchange;
34use timely::dataflow::operators::Operator;
35use timely::dataflow::operators::generic::source;
36use timely::scheduling::{Scheduler, SyncActivator};
37use timely::worker::Worker as TimelyWorker;
38use uuid::Uuid;
39
40/// A sender pushing commands onto the command channel.
41pub struct Sender {
42    tx: crossbeam_channel::Sender<(ComputeCommand, Uuid)>,
43    activator: Arc<Mutex<Option<SyncActivator>>>,
44}
45
46impl Sender {
47    /// Broadcasts the given command to all workers.
48    pub fn send(&self, message: (ComputeCommand, Uuid)) {
49        if self.tx.send(message).is_err() {
50            unreachable!("command channel never shuts down");
51        }
52
53        self.activator
54            .lock()
55            .expect("poisoned")
56            .as_ref()
57            .map(|a| a.activate());
58    }
59}
60
61/// A receiver reading commands from the command channel.
62pub struct Receiver {
63    rx: crossbeam_channel::Receiver<(ComputeCommand, Uuid)>,
64}
65
66impl Receiver {
67    /// Returns the next available command, if any.
68    ///
69    /// This returns `None` when there are currently no commands but there might be commands again
70    /// in the future.
71    pub fn try_recv(&self) -> Option<(ComputeCommand, Uuid)> {
72        match self.rx.try_recv() {
73            Ok(msg) => Some(msg),
74            Err(TryRecvError::Empty) => None,
75            Err(TryRecvError::Disconnected) => {
76                unreachable!("command channel never shuts down");
77            }
78        }
79    }
80}
81
82/// Render the command channel dataflow.
83pub fn render<A: Allocate>(timely_worker: &mut TimelyWorker<A>) -> (Sender, Receiver) {
84    let (input_tx, input_rx) = crossbeam_channel::unbounded();
85    let (output_tx, output_rx) = crossbeam_channel::unbounded();
86    let activator = Arc::new(Mutex::new(None));
87
88    // TODO(teskje): This implementation relies on Timely channels preserving the order of their
89    // inputs, which is not something they guarantee. We can avoid that by using explicit indexing,
90    // like storage's command sequencer does.
91    timely_worker.dataflow_named::<u64, _, _>("command_channel", {
92        let activator = Arc::clone(&activator);
93        move |scope| {
94            source(scope, "command_channel::source", |cap, info| {
95                let sync_activator = scope.sync_activator_for(info.address.to_vec());
96                *activator.lock().expect("poisoned") = Some(sync_activator);
97
98                let worker_id = scope.index();
99                let peers = scope.peers();
100
101                // Only worker 0 broadcasts commands, other workers must drop their capability to
102                // avoid holding up dataflow progress.
103                let mut capability = (worker_id == 0).then_some(cap);
104
105                move |output| {
106                    let Some(cap) = &mut capability else {
107                        // Non-leader workers will still receive `UpdateConfiguration` commands and
108                        // we must drain those to not leak memory.
109                        while let Ok((cmd, _nonce)) = input_rx.try_recv() {
110                            assert_ne!(worker_id, 0);
111                            assert!(matches!(cmd, ComputeCommand::UpdateConfiguration(_)));
112                        }
113                        return;
114                    };
115
116                    assert_eq!(worker_id, 0);
117
118                    let input: Vec<_> = input_rx.try_iter().collect();
119                    for (cmd, nonce) in input {
120                        let worker_cmds =
121                            split_command(cmd, peers).map(|(idx, cmd)| (idx, cmd, nonce));
122                        output.session(&cap).give_iterator(worker_cmds);
123
124                        cap.downgrade(&(cap.time() + 1));
125                    }
126                }
127            })
128            .sink(
129                Exchange::new(|(idx, _, _)| u64::cast_from(*idx)),
130                "command_channel::sink",
131                move |input| {
132                    while let Some((_cap, data)) = input.next() {
133                        for (_idx, cmd, nonce) in data.drain(..) {
134                            let _ = output_tx.send((cmd, nonce));
135                        }
136                    }
137                },
138            );
139        }
140    });
141
142    let tx = Sender {
143        tx: input_tx,
144        activator,
145    };
146    let rx = Receiver { rx: output_rx };
147
148    (tx, rx)
149}
150
151/// Split the given command into the given number of parts.
152///
153/// Returns an iterator that produces each command part, along with its part index.
154fn split_command(
155    command: ComputeCommand,
156    parts: usize,
157) -> impl Iterator<Item = (usize, ComputeCommand)> {
158    use itertools::Either;
159
160    let commands = match command {
161        ComputeCommand::CreateDataflow(dataflow) => {
162            let dataflow = *dataflow;
163
164            // A list of descriptions of objects for each part to build.
165            let mut builds_parts = vec![Vec::new(); parts];
166            // Partition each build description among `parts`.
167            for build_desc in dataflow.objects_to_build {
168                let build_part = build_desc.plan.partition_among(parts);
169                for (plan, objects_to_build) in
170                    build_part.into_iter().zip_eq(builds_parts.iter_mut())
171                {
172                    objects_to_build.push(BuildDesc {
173                        id: build_desc.id,
174                        plan,
175                    });
176                }
177            }
178
179            // Each list of build descriptions results in a dataflow description.
180            let commands = builds_parts
181                .into_iter()
182                .map(move |objects_to_build| DataflowDescription {
183                    source_imports: dataflow.source_imports.clone(),
184                    index_imports: dataflow.index_imports.clone(),
185                    objects_to_build,
186                    index_exports: dataflow.index_exports.clone(),
187                    sink_exports: dataflow.sink_exports.clone(),
188                    as_of: dataflow.as_of.clone(),
189                    until: dataflow.until.clone(),
190                    debug_name: dataflow.debug_name.clone(),
191                    initial_storage_as_of: dataflow.initial_storage_as_of.clone(),
192                    refresh_schedule: dataflow.refresh_schedule.clone(),
193                    time_dependence: dataflow.time_dependence.clone(),
194                })
195                .map(Box::new)
196                .map(ComputeCommand::CreateDataflow);
197            Either::Left(commands)
198        }
199        command => {
200            let commands = std::iter::repeat_n(command, parts);
201            Either::Right(commands)
202        }
203    };
204
205    commands.into_iter().enumerate()
206}