1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
1516//! Parallelization contracts, describing requirements for data movement along dataflow edges.
1718use std::rc::Rc;
19use timely::communication::{Pull, Push};
20use timely::dataflow::channels::pact::{LogPuller, LogPusher, ParallelizationContract};
21use timely::dataflow::channels::{ContainerBytes, Message};
22use timely::logging::TimelyLogger;
23use timely::progress::Timestamp;
24use timely::worker::AsWorker;
25use timely::{Container, ExchangeData};
2627/// A connection that distributes containers to all workers in a round-robin fashion
28#[derive(Debug)]
29pub struct Distribute;
3031impl<T, C> ParallelizationContract<T, C> for Distribute
32where
33T: Timestamp,
34 C: Container + ContainerBytes + Send + 'static,
35{
36type Pusher = DistributePusher<LogPusher<T, C, Box<dyn Push<Message<T, C>>>>>;
37type Puller = LogPuller<T, C, Box<dyn Pull<Message<T, C>>>>;
3839fn connect<A: AsWorker>(
40self,
41 allocator: &mut A,
42 identifier: usize,
43 address: Rc<[usize]>,
44 logging: Option<TimelyLogger>,
45 ) -> (Self::Pusher, Self::Puller) {
46let (senders, receiver) = allocator.allocate::<Message<T, C>>(identifier, address);
47let senders = senders
48 .into_iter()
49 .enumerate()
50 .map(|(i, x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone()))
51 .collect::<Vec<_>>();
52 (
53 DistributePusher::new(senders),
54 LogPuller::new(receiver, allocator.index(), identifier, logging.clone()),
55 )
56 }
57}
5859/// Distributes records among target pushees.
60///
61/// It is more efficient than `Exchange` when the target worker doesn't matter
62pub struct DistributePusher<P> {
63 pushers: Vec<P>,
64 next: usize,
65}
6667impl<P> DistributePusher<P> {
68/// Allocates a new `DistributePusher` from a supplied set of pushers
69pub fn new(pushers: Vec<P>) -> DistributePusher<P> {
70Self { pushers, next: 0 }
71 }
72}
7374impl<T, C, P> Push<Message<T, C>> for DistributePusher<P>
75where
76T: Eq + ExchangeData,
77 C: Container,
78 P: Push<Message<T, C>>,
79{
80fn push(&mut self, message: &mut Option<Message<T, C>>) {
81let worker_idx = self.next;
82self.next = (self.next + 1) % self.pushers.len();
83self.pushers[worker_idx].push(message);
84 }
85}