Struct coord::coord::Coordinator [−][src]
pub struct Coordinator {Show 24 fields
dataflow_client: Controller<Box<dyn Client>>,
view_optimizer: Optimizer,
catalog: Catalog,
persister: PersisterWithConfig,
indexes: ArrangementFrontiers<Timestamp>,
sources: ArrangementFrontiers<Timestamp>,
logical_compaction_window_ms: Option<Timestamp>,
logging_enabled: bool,
internal_cmd_tx: UnboundedSender<Message>,
metric_scraper: Scraper,
last_open_local_ts: Timestamp,
writes_at_open_ts: bool,
read_writes_at_open_ts: bool,
transient_id_counter: u64,
active_conns: HashMap<u32, ConnMeta>,
index_since_updates: Rc<RefCell<HashMap<GlobalId, Antichain<Timestamp>>>>,
source_since_updates: Rc<RefCell<HashMap<GlobalId, Antichain<Timestamp>>>>,
since_handles: HashMap<GlobalId, AntichainToken<Timestamp>>,
txn_reads: HashMap<u32, TxnReads>,
sink_writes: HashMap<GlobalId, SinkWrites<Timestamp>>,
pending_peeks: HashMap<u32, UnboundedSender<PeekResponse>>,
pending_tails: HashMap<GlobalId, PendingTail>,
write_lock: Arc<Mutex<()>>,
write_lock_wait_group: VecDeque<DeferredPlan>,
}
Expand description
Glues the external world to the Timely workers.
Fields
dataflow_client: Controller<Box<dyn Client>>
A client to a running dataflow cluster.
view_optimizer: Optimizer
Optimizer instance for logical optimization of views.
catalog: Catalog
persister: PersisterWithConfig
A runtime for the persist
crate alongside its configuration.
indexes: ArrangementFrontiers<Timestamp>
Maps (global Id of arrangement) -> (frontier information). This tracks the
upper
and computed since
of the indexes. The since
is the time at
which we are willing to compact up to. determine_timestamp()
uses this as
part of its heuristic when determining a viable timestamp for queries.
sources: ArrangementFrontiers<Timestamp>
Map of frontier information for sources
logical_compaction_window_ms: Option<Timestamp>
Delta from leading edge of an arrangement from which we allow compaction.
logging_enabled: bool
Whether base sources are enabled.
internal_cmd_tx: UnboundedSender<Message>
Channel to manange internal commands from the coordinator to itself.
metric_scraper: Scraper
Channel to communicate source status updates to the timestamper thread.
last_open_local_ts: Timestamp
The last known timestamp that was considered “open” (i.e. where writes
may occur). However, this timestamp is not open when
read_writes_at_open_ts == true
; in this case, reads will occur at
last_open_local_ts
, and the Coordinator must open a new timestamp
for writes.
Indirectly, this value aims to represent the Coordinator’s desired value
for upper
for table frontiers, as long as we know it is open.
writes_at_open_ts: bool
Whether or not we have written at the open timestamp.
read_writes_at_open_ts: bool
Whether or not we have read the writes that have occurred at the open
timestamp. When this is true
, it signals we need to open a new
timestamp to support future writes.
transient_id_counter: u64
active_conns: HashMap<u32, ConnMeta>
A map from connection ID to metadata about that connection for all active connections.
index_since_updates: Rc<RefCell<HashMap<GlobalId, Antichain<Timestamp>>>>
Holds pending compaction messages to be sent to the dataflow workers. When
since_handles
are advanced or txn_reads
are dropped, this can advance.
source_since_updates: Rc<RefCell<HashMap<GlobalId, Antichain<Timestamp>>>>
Holds pending compaction messages to be sent to the dataflow workers. When
since_handles
are advanced or txn_reads
are dropped, this can advance.
since_handles: HashMap<GlobalId, AntichainToken<Timestamp>>
Holds handles to ids that are advanced by update_upper.
txn_reads: HashMap<u32, TxnReads>
Tracks active read transactions so that we don’t compact any indexes beyond an in-progress transaction.
sink_writes: HashMap<GlobalId, SinkWrites<Timestamp>>
Tracks write frontiers for active exactly-once sinks.
pending_peeks: HashMap<u32, UnboundedSender<PeekResponse>>
A map from pending peeks to the queue into which responses are sent, and the IDs of workers who have responded.
pending_tails: HashMap<GlobalId, PendingTail>
A map from pending tails to the tail description.
write_lock: Arc<Mutex<()>>
Serializes accesses to write critical sections.
write_lock_wait_group: VecDeque<DeferredPlan>
Holds plans deferred due to write lock.
Implementations
Creates a new dataflow builder from the catalog and indexes in self
.
pub fn prepare_index_build(
catalog: &CatalogState,
index_id: &GlobalId
) -> Option<(String, IndexDesc)>
pub fn prepare_index_build(
catalog: &CatalogState,
index_id: &GlobalId
) -> Option<(String, IndexDesc)>
Prepares the arguments to an index build dataflow, by interrogating the catalog.
Returns None
if the index entry in the catalog in not enabled.
pub async fn implement_fast_path_peek(
&mut self,
fast_path: Plan,
timestamp: Timestamp,
finishing: RowSetFinishing,
conn_id: u32,
source_arity: usize
) -> Result<ExecuteResponse, CoordError>
pub async fn implement_fast_path_peek(
&mut self,
fast_path: Plan,
timestamp: Timestamp,
finishing: RowSetFinishing,
conn_id: u32,
source_arity: usize
) -> Result<ExecuteResponse, CoordError>
Implements a peek plan produced by create_plan
above.
Assign a timestamp for a read from a local input. Reads following writes must be at a time >= the write’s timestamp; we choose “equal to” for simplicity’s sake and to open as few new timestamps as possible.
Assign a timestamp for a write to a local input. Writes following reads must ensure that they are assigned a strictly larger timestamp to ensure they are not visible to any real-time earlier reads.
Opens a new timestamp for local inputs at which writes may occur, and where reads should return quickly at a value 1 less.
fn new_index_frontiers<I>(
&mut self,
id: GlobalId,
initial: I,
compaction_window_ms: Option<Timestamp>
) -> Frontiers<Timestamp> where
I: IntoIterator<Item = Timestamp>,
fn new_index_frontiers<I>(
&mut self,
id: GlobalId,
initial: I,
compaction_window_ms: Option<Timestamp>
) -> Frontiers<Timestamp> where
I: IntoIterator<Item = Timestamp>,
Generate a new frontiers object that forwards since changes to index_since_updates
.
Panics
This function panics if called twice with the same id
.
fn new_source_frontiers<I>(
&mut self,
id: GlobalId,
initial: I,
compaction_window_ms: Option<Timestamp>
) -> Frontiers<Timestamp> where
I: IntoIterator<Item = Timestamp>,
fn new_source_frontiers<I>(
&mut self,
id: GlobalId,
initial: I,
compaction_window_ms: Option<Timestamp>
) -> Frontiers<Timestamp> where
I: IntoIterator<Item = Timestamp>,
Generate a new frontiers object that forwards since changes to source_since_updates
.
Panics
This function panics if called twice with the same id
.
async fn bootstrap(
&mut self,
builtin_table_updates: Vec<BuiltinTableUpdate>
) -> Result<(), CoordError>
async fn bootstrap(
&mut self,
builtin_table_updates: Vec<BuiltinTableUpdate>
) -> Result<(), CoordError>
Initializes coordinator state based on the contained catalog. Must be
called after creating the coordinator and before calling the
Coordinator::serve
method.
async fn serve(
self,
internal_cmd_rx: UnboundedReceiver<Message>,
cmd_rx: UnboundedReceiver<Command>
)
async fn serve(
self,
internal_cmd_rx: UnboundedReceiver<Message>,
cmd_rx: UnboundedReceiver<Command>
)
Serves the coordinator, receiving commands from users over cmd_rx
and feedback from dataflow workers over feedback_rx
.
You must call bootstrap
before calling this method.
Validate that all upper frontier updates obey the following invariants:
- The
upper
frontier for each source, index and sink does not go backwards with upper updates upper
never contains any times with negative multiplicity.upper
never contains any times with multiplicity greater than1
.- No updates increase the sum of all multiplicities in
upper
.
Note that invariants 2 - 4 require single dimensional time, and a fixed number of
dataflow workers. If we migrate to multidimensional time then 2 no longer holds, and
3. relaxes to “the longest chain in upper
has to have <= n_workers elements” and
4. relaxes to “no comparable updates increase the sum of all multiplicities in upper
”.
If we ever switch to dynamically scaling the number of dataflow workers then 3 and 4 no
longer hold.
Updates the upper frontier of a maintained arrangement or sink.
Forward the subset of since updates that belong to persisted tables’ primary indexes to the persisted tables themselves.
TODO: In the future the coordinator should perhaps track a table’s upper and since frontiers directly as it currently does for sources.
Perform maintenance work associated with the coordinator.
Primarily, this involves sequencing compaction commands, which should be issued whenever available.
async fn handle_statement(
&mut self,
session: &mut Session,
stmt: Statement<Raw>,
params: &Params
) -> Result<Plan, CoordError>
fn handle_declare(
&self,
session: &mut Session,
name: String,
stmt: Statement<Raw>,
param_types: Vec<Option<Type>>
) -> Result<(), CoordError>
fn handle_describe(
&self,
session: &mut Session,
name: String,
stmt: Option<Statement<Raw>>,
param_types: Vec<Option<Type>>
) -> Result<(), CoordError>
fn describe(
&self,
session: &Session,
stmt: Option<Statement<Raw>>,
param_types: Vec<Option<Type>>
) -> Result<StatementDesc, CoordError>
fn handle_verify_prepared_statement(
&self,
session: &mut Session,
name: &str
) -> Result<(), CoordError>
fn handle_verify_prepared_statement(
&self,
session: &mut Session,
name: &str
) -> Result<(), CoordError>
Verify a prepared statement is still valid.
Instruct the dataflow layer to cancel any ongoing, interactive work for
the named conn_id
.
Handle termination of a client session.
This cleans up any state in the coordinator associated with the session.
Handle removing in-progress transaction state regardless of the end action of the transaction.
Removes all temporary items created by the specified connection, though not the temporary schema itself.
async fn handle_sink_connector_ready(
&mut self,
id: GlobalId,
oid: u32,
connector: SinkConnector
) -> Result<(), CoordError>
async fn sequence_plan(
&mut self,
tx: ClientTransmitter<ExecuteResponse>,
session: Session,
plan: Plan
)
fn sequence_execute(
&mut self,
session: &mut Session,
plan: ExecutePlan
) -> Result<String, CoordError>
async fn sequence_create_database(
&mut self,
plan: CreateDatabasePlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_create_schema(
&mut self,
plan: CreateSchemaPlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_create_role(
&mut self,
plan: CreateRolePlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_create_table(
&mut self,
session: &Session,
plan: CreateTablePlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_create_source(
&mut self,
session: &mut Session,
plan: CreateSourcePlan
) -> Result<ExecuteResponse, CoordError>
fn generate_create_source_ops(
&mut self,
session: &mut Session,
plans: Vec<CreateSourcePlan>
) -> Result<(Vec<(GlobalId, Option<GlobalId>)>, Vec<Op>), CoordError>
async fn sequence_create_sink(
&mut self,
session: Session,
plan: CreateSinkPlan,
tx: ClientTransmitter<ExecuteResponse>
)
fn generate_view_ops(
&mut self,
session: &Session,
name: FullName,
view: View,
replace: Option<GlobalId>,
materialize: bool
) -> Result<(Vec<Op>, Option<GlobalId>), CoordError>
async fn sequence_create_view(
&mut self,
session: &Session,
plan: CreateViewPlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_create_views(
&mut self,
session: &mut Session,
plan: CreateViewsPlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_create_index(
&mut self,
plan: CreateIndexPlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_create_type(
&mut self,
plan: CreateTypePlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_drop_database(
&mut self,
plan: DropDatabasePlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_drop_schema(
&mut self,
plan: DropSchemaPlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_drop_roles(
&mut self,
plan: DropRolesPlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_drop_items(
&mut self,
plan: DropItemsPlan
) -> Result<ExecuteResponse, CoordError>
fn sequence_show_all_variables(
&mut self,
session: &Session
) -> Result<ExecuteResponse, CoordError>
fn sequence_show_variable(
&self,
session: &Session,
plan: ShowVariablePlan
) -> Result<ExecuteResponse, CoordError>
fn sequence_set_variable(
&self,
session: &mut Session,
plan: SetVariablePlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_end_transaction(
&mut self,
tx: ClientTransmitter<ExecuteResponse>,
session: Session,
action: EndTransactionAction
)
async fn sequence_end_transaction_inner(
&mut self,
session: &mut Session,
action: EndTransactionAction
) -> Result<impl Future<Output = Result<(), CoordError>>, CoordError>
Return the set of ids in a timedomain and verify timeline correctness.
When a user starts a transaction, we need to prevent compaction of anything they might read from. We use a heuristic of “anything in the same database schemas with the same timeline as whatever the first query is”.
async fn sequence_peek(
&mut self,
session: &mut Session,
plan: PeekPlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_peek(
&mut self,
session: &mut Session,
plan: PeekPlan
) -> Result<ExecuteResponse, CoordError>
Sequence a peek, determining a timestamp and the most efficient dataflow interaction.
Peeks are sequenced by assigning a timestamp for evaluation, and then determining and deploying the most efficient evaluation plan. The peek could evaluate to a constant, be a simple read out of an existing arrangement, or required a new dataflow to build the results to return.
async fn sequence_tail(
&mut self,
session: &mut Session,
plan: TailPlan
) -> Result<ExecuteResponse, CoordError>
A policy for determining the timestamp for a peek.
The Timestamp result may be None
in the case that the when
policy
cannot be satisfied, which is possible due to the restricted validity of
traces (each has a since
and upper
frontier, and are only valid after
since
and sure to be available not after upper
). The set of indexes
used is also returned.
Determine the frontier of updates to start from for a sink based on
source_id
.
Updates greater or equal to this frontier will be produced.
fn sequence_explain(
&mut self,
session: &Session,
plan: ExplainPlan
) -> Result<ExecuteResponse, CoordError>
fn sequence_send_diffs(
&mut self,
session: &mut Session,
plan: SendDiffsPlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_insert(
&mut self,
tx: ClientTransmitter<ExecuteResponse>,
session: Session,
plan: InsertPlan
)
fn sequence_insert_constant(
&mut self,
session: &mut Session,
id: GlobalId,
constants: MirRelationExpr
) -> Result<ExecuteResponse, CoordError>
fn sequence_copy_rows(
&mut self,
session: &mut Session,
id: GlobalId,
columns: Vec<usize>,
rows: Vec<Row>
) -> Result<ExecuteResponse, CoordError>
async fn sequence_read_then_write(
&mut self,
tx: ClientTransmitter<ExecuteResponse>,
session: Session,
plan: ReadThenWritePlan
)
async fn sequence_alter_item_rename(
&mut self,
plan: AlterItemRenamePlan
) -> Result<ExecuteResponse, CoordError>
fn sequence_alter_index_set_options(
&mut self,
plan: AlterIndexSetOptionsPlan
) -> Result<ExecuteResponse, CoordError>
fn sequence_alter_index_reset_options(
&mut self,
plan: AlterIndexResetOptionsPlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_alter_index_enable(
&mut self,
plan: AlterIndexEnablePlan
) -> Result<ExecuteResponse, CoordError>
async fn catalog_transact<F, T>(
&mut self,
ops: Vec<Op>,
f: F
) -> Result<T, CoordError> where
F: FnOnce(DataflowBuilder<'_>) -> Result<T, CoordError>,
async fn catalog_transact<F, T>(
&mut self,
ops: Vec<Op>,
f: F
) -> Result<T, CoordError> where
F: FnOnce(DataflowBuilder<'_>) -> Result<T, CoordError>,
Perform a catalog transaction. The closure is passed a DataflowBuilder
made from the prospective CatalogState
(i.e., the Catalog
with ops
applied but before the transaction is committed). The closure can return
an error to abort the transaction, or otherwise return a value that is
returned by this function. This allows callers to error while building
DataflowDesc
s. Coordinator::ship_dataflow
must be called after this
function successfully returns on any built DataflowDesc
.
fn set_index_options(
&mut self,
id: GlobalId,
options: Vec<IndexOption>
) -> Result<(), CoordError>
fn prep_relation_expr(
&mut self,
expr: MirRelationExpr,
style: ExprPrepStyle
) -> Result<OptimizedMirRelationExpr, CoordError>
fn prep_relation_expr(
&mut self,
expr: MirRelationExpr,
style: ExprPrepStyle
) -> Result<OptimizedMirRelationExpr, CoordError>
Prepares a relation expression for execution by preparing all contained
scalar expressions (see prep_scalar_expr
), then optimizing the
relation expression.
Prepares a scalar expression for execution by replacing any placeholders with their correct values.
Specifically, calls to the special function MzLogicalTimestamp
are
replaced if style
is OneShot { logical_timestamp }
. Calls are not
replaced for the Explain
style nor for Static
which should not
reach this point if we have correctly validated the use of placeholders.
Finalizes a dataflow and then broadcasts it to all workers. Utility method for the more general Self::ship_dataflows
Finalizes a list of dataflows and then broadcasts it to all workers.
Finalizes a dataflow.
Finalization includes optimization, but also validation of various
invariants such as ensuring that the as_of
frontier is in advance of
the various since
frontiers of participating data inputs.
In particular, there are requirement on the as_of
field for the dataflow
and the since
frontiers of created arrangements, as a function of the since
frontiers of dataflow inputs (sources and imported arrangements).
Panics
Panics if as_of is < the since
frontiers.
Panics if the dataflow descriptions contain an invalid plan.
Return an error if the ids are from incompatible timelines. This should be used to prevent users from doing things that are either meaningless (joining data from timelines that have similar numbers with different meanings like two separate debezium topics) or will never complete (joining cdcv2 and realtime data).
Attempts to immediately grant session
access to the write lock or
errors if the lock is currently held.
fn defer_write(
&mut self,
tx: ClientTransmitter<ExecuteResponse>,
session: Session,
plan: Plan
)
fn defer_write(
&mut self,
tx: ClientTransmitter<ExecuteResponse>,
session: Session,
plan: Plan
)
Defers executing plan
until the write lock becomes available; waiting
occurs in a greenthread, so callers of this function likely want to
return after calling it.
Auto Trait Implementations
impl !RefUnwindSafe for Coordinator
impl !Send for Coordinator
impl !Sync for Coordinator
impl Unpin for Coordinator
impl !UnwindSafe for Coordinator
Blanket Implementations
Mutably borrows from an owned value. Read more
Attaches the provided Subscriber
to this type, returning a
WithDispatch
wrapper. Read more
Attaches the current default Subscriber
to this type, returning a
WithDispatch
wrapper. Read more