mz_timely_util/
pact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Parallelization contracts, describing requirements for data movement along dataflow edges.
17
18use std::rc::Rc;
19use timely::communication::{Pull, Push};
20use timely::dataflow::channels::pact::{LogPuller, LogPusher, ParallelizationContract};
21use timely::dataflow::channels::{ContainerBytes, Message};
22use timely::logging::TimelyLogger;
23use timely::progress::Timestamp;
24use timely::worker::AsWorker;
25use timely::{Container, ExchangeData};
26
27/// A connection that distributes containers to all workers in a round-robin fashion
28#[derive(Debug)]
29pub struct Distribute;
30
31impl<T, C> ParallelizationContract<T, C> for Distribute
32where
33    T: Timestamp,
34    C: Container + ContainerBytes + Send + 'static,
35{
36    type Pusher = DistributePusher<LogPusher<T, C, Box<dyn Push<Message<T, C>>>>>;
37    type Puller = LogPuller<T, C, Box<dyn Pull<Message<T, C>>>>;
38
39    fn connect<A: AsWorker>(
40        self,
41        allocator: &mut A,
42        identifier: usize,
43        address: Rc<[usize]>,
44        logging: Option<TimelyLogger>,
45    ) -> (Self::Pusher, Self::Puller) {
46        let (senders, receiver) = allocator.allocate::<Message<T, C>>(identifier, address);
47        let senders = senders
48            .into_iter()
49            .enumerate()
50            .map(|(i, x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone()))
51            .collect::<Vec<_>>();
52        (
53            DistributePusher::new(senders),
54            LogPuller::new(receiver, allocator.index(), identifier, logging.clone()),
55        )
56    }
57}
58
59/// Distributes records among target pushees.
60///
61/// It is more efficient than `Exchange` when the target worker doesn't matter
62pub struct DistributePusher<P> {
63    pushers: Vec<P>,
64    next: usize,
65}
66
67impl<P> DistributePusher<P> {
68    /// Allocates a new `DistributePusher` from a supplied set of pushers
69    pub fn new(pushers: Vec<P>) -> DistributePusher<P> {
70        Self { pushers, next: 0 }
71    }
72}
73
74impl<T, C, P> Push<Message<T, C>> for DistributePusher<P>
75where
76    T: Eq + ExchangeData,
77    C: Container,
78    P: Push<Message<T, C>>,
79{
80    fn push(&mut self, message: &mut Option<Message<T, C>>) {
81        let worker_idx = self.next;
82        self.next = (self.next + 1) % self.pushers.len();
83        self.pushers[worker_idx].push(message);
84    }
85}