Struct coord::coord::Coordinator[][src]

pub struct Coordinator {
Show 24 fields dataflow_client: Controller<Box<dyn Client>>, view_optimizer: Optimizer, catalog: Catalog, persister: PersisterWithConfig, indexes: ArrangementFrontiers<Timestamp>, sources: ArrangementFrontiers<Timestamp>, logical_compaction_window_ms: Option<Timestamp>, logging_enabled: bool, internal_cmd_tx: UnboundedSender<Message>, metric_scraper: Scraper, last_open_local_ts: Timestamp, writes_at_open_ts: bool, read_writes_at_open_ts: bool, transient_id_counter: u64, active_conns: HashMap<u32, ConnMeta>, index_since_updates: Rc<RefCell<HashMap<GlobalId, Antichain<Timestamp>>>>, source_since_updates: Rc<RefCell<HashMap<GlobalId, Antichain<Timestamp>>>>, since_handles: HashMap<GlobalId, AntichainToken<Timestamp>>, txn_reads: HashMap<u32, TxnReads>, sink_writes: HashMap<GlobalId, SinkWrites<Timestamp>>, pending_peeks: HashMap<u32, UnboundedSender<PeekResponse>>, pending_tails: HashMap<GlobalId, PendingTail>, write_lock: Arc<Mutex<()>>, write_lock_wait_group: VecDeque<DeferredPlan>,
}
Expand description

Glues the external world to the Timely workers.

Fields

dataflow_client: Controller<Box<dyn Client>>

A client to a running dataflow cluster.

view_optimizer: Optimizer

Optimizer instance for logical optimization of views.

catalog: Catalogpersister: PersisterWithConfig

A runtime for the persist crate alongside its configuration.

indexes: ArrangementFrontiers<Timestamp>

Maps (global Id of arrangement) -> (frontier information). This tracks the upper and computed since of the indexes. The since is the time at which we are willing to compact up to. determine_timestamp() uses this as part of its heuristic when determining a viable timestamp for queries.

sources: ArrangementFrontiers<Timestamp>

Map of frontier information for sources

logical_compaction_window_ms: Option<Timestamp>

Delta from leading edge of an arrangement from which we allow compaction.

logging_enabled: bool

Whether base sources are enabled.

internal_cmd_tx: UnboundedSender<Message>

Channel to manange internal commands from the coordinator to itself.

metric_scraper: Scraper

Channel to communicate source status updates to the timestamper thread.

last_open_local_ts: Timestamp

The last known timestamp that was considered “open” (i.e. where writes may occur). However, this timestamp is not open when read_writes_at_open_ts == true; in this case, reads will occur at last_open_local_ts, and the Coordinator must open a new timestamp for writes.

Indirectly, this value aims to represent the Coordinator’s desired value for upper for table frontiers, as long as we know it is open.

writes_at_open_ts: bool

Whether or not we have written at the open timestamp.

read_writes_at_open_ts: bool

Whether or not we have read the writes that have occurred at the open timestamp. When this is true, it signals we need to open a new timestamp to support future writes.

transient_id_counter: u64active_conns: HashMap<u32, ConnMeta>

A map from connection ID to metadata about that connection for all active connections.

index_since_updates: Rc<RefCell<HashMap<GlobalId, Antichain<Timestamp>>>>

Holds pending compaction messages to be sent to the dataflow workers. When since_handles are advanced or txn_reads are dropped, this can advance.

source_since_updates: Rc<RefCell<HashMap<GlobalId, Antichain<Timestamp>>>>

Holds pending compaction messages to be sent to the dataflow workers. When since_handles are advanced or txn_reads are dropped, this can advance.

since_handles: HashMap<GlobalId, AntichainToken<Timestamp>>

Holds handles to ids that are advanced by update_upper.

txn_reads: HashMap<u32, TxnReads>

Tracks active read transactions so that we don’t compact any indexes beyond an in-progress transaction.

sink_writes: HashMap<GlobalId, SinkWrites<Timestamp>>

Tracks write frontiers for active exactly-once sinks.

pending_peeks: HashMap<u32, UnboundedSender<PeekResponse>>

A map from pending peeks to the queue into which responses are sent, and the IDs of workers who have responded.

pending_tails: HashMap<GlobalId, PendingTail>

A map from pending tails to the tail description.

write_lock: Arc<Mutex<()>>

Serializes accesses to write critical sections.

write_lock_wait_group: VecDeque<DeferredPlan>

Holds plans deferred due to write lock.

Implementations

Creates a new dataflow builder from the catalog and indexes in self.

Prepares the arguments to an index build dataflow, by interrogating the catalog.

Returns None if the index entry in the catalog in not enabled.

Implements a peek plan produced by create_plan above.

Assign a timestamp for a read from a local input. Reads following writes must be at a time >= the write’s timestamp; we choose “equal to” for simplicity’s sake and to open as few new timestamps as possible.

Assign a timestamp for a write to a local input. Writes following reads must ensure that they are assigned a strictly larger timestamp to ensure they are not visible to any real-time earlier reads.

Opens a new timestamp for local inputs at which writes may occur, and where reads should return quickly at a value 1 less.

Generate a new frontiers object that forwards since changes to index_since_updates.

Panics

This function panics if called twice with the same id.

Generate a new frontiers object that forwards since changes to source_since_updates.

Panics

This function panics if called twice with the same id.

Initializes coordinator state based on the contained catalog. Must be called after creating the coordinator and before calling the Coordinator::serve method.

Serves the coordinator, receiving commands from users over cmd_rx and feedback from dataflow workers over feedback_rx.

You must call bootstrap before calling this method.

Validate that all upper frontier updates obey the following invariants:

  1. The upper frontier for each source, index and sink does not go backwards with upper updates
  2. upper never contains any times with negative multiplicity.
  3. upper never contains any times with multiplicity greater than 1.
  4. No updates increase the sum of all multiplicities in upper.

Note that invariants 2 - 4 require single dimensional time, and a fixed number of dataflow workers. If we migrate to multidimensional time then 2 no longer holds, and 3. relaxes to “the longest chain in upper has to have <= n_workers elements” and 4. relaxes to “no comparable updates increase the sum of all multiplicities in upper”. If we ever switch to dynamically scaling the number of dataflow workers then 3 and 4 no longer hold.

Updates the upper frontier of a maintained arrangement or sink.

Forward the subset of since updates that belong to persisted tables’ primary indexes to the persisted tables themselves.

TODO: In the future the coordinator should perhaps track a table’s upper and since frontiers directly as it currently does for sources.

Perform maintenance work associated with the coordinator.

Primarily, this involves sequencing compaction commands, which should be issued whenever available.

Verify a prepared statement is still valid.

Instruct the dataflow layer to cancel any ongoing, interactive work for the named conn_id.

Handle termination of a client session.

This cleans up any state in the coordinator associated with the session.

Handle removing in-progress transaction state regardless of the end action of the transaction.

Removes all temporary items created by the specified connection, though not the temporary schema itself.

Return the set of ids in a timedomain and verify timeline correctness.

When a user starts a transaction, we need to prevent compaction of anything they might read from. We use a heuristic of “anything in the same database schemas with the same timeline as whatever the first query is”.

Sequence a peek, determining a timestamp and the most efficient dataflow interaction.

Peeks are sequenced by assigning a timestamp for evaluation, and then determining and deploying the most efficient evaluation plan. The peek could evaluate to a constant, be a simple read out of an existing arrangement, or required a new dataflow to build the results to return.

A policy for determining the timestamp for a peek.

The Timestamp result may be None in the case that the when policy cannot be satisfied, which is possible due to the restricted validity of traces (each has a since and upper frontier, and are only valid after since and sure to be available not after upper). The set of indexes used is also returned.

Determine the frontier of updates to start from for a sink based on source_id.

Updates greater or equal to this frontier will be produced.

Perform a catalog transaction. The closure is passed a DataflowBuilder made from the prospective CatalogState (i.e., the Catalog with ops applied but before the transaction is committed). The closure can return an error to abort the transaction, or otherwise return a value that is returned by this function. This allows callers to error while building DataflowDescs. Coordinator::ship_dataflow must be called after this function successfully returns on any built DataflowDesc.

Prepares a relation expression for execution by preparing all contained scalar expressions (see prep_scalar_expr), then optimizing the relation expression.

Prepares a scalar expression for execution by replacing any placeholders with their correct values.

Specifically, calls to the special function MzLogicalTimestamp are replaced if style is OneShot { logical_timestamp }. Calls are not replaced for the Explain style nor for Static which should not reach this point if we have correctly validated the use of placeholders.

Finalizes a dataflow and then broadcasts it to all workers. Utility method for the more general Self::ship_dataflows

Finalizes a list of dataflows and then broadcasts it to all workers.

Finalizes a dataflow.

Finalization includes optimization, but also validation of various invariants such as ensuring that the as_of frontier is in advance of the various since frontiers of participating data inputs.

In particular, there are requirement on the as_of field for the dataflow and the since frontiers of created arrangements, as a function of the since frontiers of dataflow inputs (sources and imported arrangements).

Panics

Panics if as_of is < the since frontiers.

Panics if the dataflow descriptions contain an invalid plan.

Return an error if the ids are from incompatible timelines. This should be used to prevent users from doing things that are either meaningless (joining data from timelines that have similar numbers with different meanings like two separate debezium topics) or will never complete (joining cdcv2 and realtime data).

Attempts to immediately grant session access to the write lock or errors if the lock is currently held.

Defers executing plan until the write lock becomes available; waiting occurs in a greenthread, so callers of this function likely want to return after calling it.

Auto Trait Implementations

Blanket Implementations

Gets the TypeId of self. Read more

Immutably borrows from an owned value. Read more

Mutably borrows from an owned value. Read more

Performs the conversion.

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more

Instruments this type with the current Span, returning an Instrumented wrapper. Read more

Performs the conversion.

Should always be Self

The type returned in the event of a conversion error.

Performs the conversion.

The type returned in the event of a conversion error.

Performs the conversion.

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more