mz_timely_util/capture.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use std::sync::Arc;
17
18use mz_ore::channel::{InstrumentedChannelMetric, InstrumentedUnboundedSender};
19use timely::communication::Push;
20use timely::dataflow::operators::capture::{Event, EventPusher};
21
22/// A thread-safe linked list for cross-thread event streaming.
23///
24/// Uses `Arc`/`Mutex` internally, unlike the `Rc`/`RefCell`-based `EventLink`
25/// in `timely::capture`. Designed to be shared via `Arc<EventLink<T, C>>`:
26/// the writer side implements [`EventPusher`] and the reader side implements
27/// [`EventIterator`](timely::dataflow::operators::capture::event::EventIterator).
28pub use timely::dataflow::operators::capture::event::link_sync::EventLink;
29
30/// Creates a linked `(writer, reader)` pair for cross-thread event streaming.
31///
32/// Both handles begin pointing at the same empty sentinel node. The writer
33/// appends events via [`EventPusher`]; the reader chases the list and yields
34/// them via [`EventIterator`](timely::dataflow::operators::capture::event::EventIterator).
35pub fn arc_event_link<T, C>() -> (Arc<EventLink<T, C>>, Arc<EventLink<T, C>>) {
36 let shared = Arc::new(EventLink::new());
37 (Arc::clone(&shared), shared)
38}
39
40pub struct UnboundedTokioCapture<T, C, M>(pub InstrumentedUnboundedSender<Event<T, C>, M>);
41
42impl<T, C, M> EventPusher<T, C> for UnboundedTokioCapture<T, C, M>
43where
44 M: InstrumentedChannelMetric,
45{
46 fn push(&mut self, event: Event<T, C>) {
47 // NOTE: An Err(x) result just means "data not accepted" most likely
48 // because the receiver is gone. No need to panic.
49 let _ = self.0.send(event);
50 }
51}
52
53/// A helper type to allow capturing timely streams into timely pushers
54pub struct PusherCapture<P>(pub P);
55
56impl<P: Push<Event<T, D>>, T, D> EventPusher<T, D> for PusherCapture<P> {
57 fn push(&mut self, event: Event<T, D>) {
58 self.0.send(event);
59 self.0.done();
60 }
61}