1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::cell::RefCell;
use std::rc::Rc;

use timely::dataflow::channels::pushers::Tee;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::operators::generic::{OperatorInfo, OutputHandle};
use timely::dataflow::operators::{Capability, CapabilitySet};
use timely::dataflow::{Scope, Stream};
use timely::Data;

use repr::Timestamp;

use super::{SourceStatus, SourceToken};

/// Constructs a source named `name` in `scope` whose lifetime is controlled
/// both internally and externally.
///
/// The logic for the source is supplied by `construct`, which must return a
/// `tick` function that satisfies `L`. This function will be called
/// periodically while the source is alive and supplied with a capability to
/// produce data and the output handle into which data should be given. The
/// `tick` function is responsible for periodically downgrading this capability
/// whenever it can see that a timestamp is "closed", according to whatever
/// logic makes sense for the source.
///
/// The `tick` function is also given a secondary output handle and capability
/// that can be used to emit a stream of data that is separate from the main
/// "data" output. This can, for example, be used to emit and persist the timestamp bindings.
///
/// If `tick` realizes it will never produce data again, it should indicate that
/// fact by returning [`SourceStatus::Done`], which will immediately drop the
/// capability and guarantee that `tick` is never called again.
///
/// Otherwise, `tick` should return [`SourceStatus::Alive`]. It is `tick`'s
/// responsibility to inform Timely of its desire to be scheduled again by
/// chatting with a [`timely::scheduling::activate::Activator`]. Returning
/// [`SourceStatus::Alive`] does not alone cause the source to be scheduled
/// again; it merely keeps the capability alive.
///
/// The lifetime of the source is also controlled by the returned
/// [`SourceToken`]. When the last clone of the `SourceToken` is dropped, the
/// `tick` function will no longer be called, and the capability will eventually
/// be dropped.
///
/// When the source token is dropped, the timestamping_flag is set to false
/// to terminate any spawned threads in the source operator
pub fn source<G, D, D2, B, L>(
    scope: &G,
    name: String,
    construct: B,
) -> (Stream<G, D>, Stream<G, D2>, SourceToken)
where
    G: Scope<Timestamp = Timestamp>,
    D: Data,
    D2: Data,
    B: FnOnce(OperatorInfo) -> L,
    L: FnMut(
            &mut Capability<Timestamp>,
            &mut Capability<Timestamp>,
            &mut CapabilitySet<Timestamp>,
            &mut OutputHandle<G::Timestamp, D, Tee<G::Timestamp, D>>,
            &mut OutputHandle<G::Timestamp, D2, Tee<G::Timestamp, D2>>,
        ) -> SourceStatus
        + 'static,
{
    let mut token = None;

    let mut builder = OperatorBuilder::new(name, scope.clone());
    let operator_info = builder.operator_info();

    let (mut data_output, data_stream) = builder.new_output();
    let (mut secondary_output, secondary_stream) = builder.new_output();
    builder.set_notify(false);

    builder.build(|mut capabilities| {
        // `capabilities` should be a two-element vector.
        let secondary_capability = capabilities.pop().unwrap();
        let data_capability = capabilities.pop().unwrap();
        let durability_capability = CapabilitySet::from_elem(data_capability.clone());

        let capabilities_rc = Rc::new(RefCell::new(Some((
            data_capability,
            secondary_capability,
            durability_capability,
        ))));

        // Export a token to the outside word that will keep this source alive.
        token = Some(SourceToken {
            capabilities: Rc::clone(&capabilities_rc),
            activator: scope.activator_for(&operator_info.address[..]),
        });

        let mut tick = construct(operator_info);

        move |_frontier| {
            let mut caps = capabilities_rc.borrow_mut();
            if let Some((data_cap, secondary_cap, durability_capability)) = &mut *caps {
                // We still have our capability, so the source is still alive.
                // Delegate to the inner source.
                if let SourceStatus::Done = tick(
                    data_cap,
                    secondary_cap,
                    durability_capability,
                    &mut data_output.activate(),
                    &mut secondary_output.activate(),
                ) {
                    // The inner source is finished. Drop our capability.
                    *caps = None;
                }
            }
        }
    });

    // `build()` promises to call the provided closure before returning,
    // so we are guaranteed that `token` is non-None.
    (data_stream, secondary_stream, token.unwrap())
}