pub struct Coordinator<S> {
Show 22 fields dataflow_client: Controller, view_optimizer: Optimizer, catalog: Catalog<S>, logical_compaction_window_ms: Option<Timestamp>, internal_cmd_tx: UnboundedSender<Message>, global_timeline: TimestampOracle<Timestamp>, advance_tables: AdvanceTables<Timestamp>, transient_id_counter: u64, active_conns: HashMap<u32, ConnMeta>, read_capability: HashMap<GlobalId, ReadCapability<Timestamp>>, txn_reads: HashMap<u32, TxnReads>, pending_peeks: HashMap<Uuid, PendingPeek>, client_pending_peeks: HashMap<u32, BTreeMap<Uuid, ComputeInstanceId>>, pending_tails: HashMap<GlobalId, PendingTail>, write_lock: Arc<Mutex<()>>, write_lock_wait_group: VecDeque<Deferred>, pending_writes: Vec<PendingWriteTxn>, secrets_controller: Box<dyn SecretsController>, replica_sizes: ClusterReplicaSizeMap, availability_zones: Vec<String>, connection_context: ConnectionContext, transient_replica_metadata: HashMap<ReplicaId, Option<ReplicaMetadata>>,
}
Expand description

Glues the external world to the Timely workers.

Fields

dataflow_client: Controller

A client to a running dataflow cluster.

This component offers:

  • Sufficient isolation from COMPUTE, so long as communication with COMPUTE replicas is non-blocking.
  • Insufficient isolation from STORAGE. The ADAPTER cannot tolerate failure of STORAGE services.
view_optimizer: Optimizer

Optimizer instance for logical optimization of views.

catalog: Catalog<S>logical_compaction_window_ms: Option<Timestamp>

Delta from leading edge of an arrangement from which we allow compaction.

internal_cmd_tx: UnboundedSender<Message>

Channel to manage internal commands from the coordinator to itself.

global_timeline: TimestampOracle<Timestamp>

Mechanism for totally ordering write and read timestamps, so that all reads reflect exactly the set of writes that precede them, and no writes that follow.

advance_tables: AdvanceTables<Timestamp>

Tracks tables needing advancement, which can be processed at a low priority in the biased select loop.

transient_id_counter: u64active_conns: HashMap<u32, ConnMeta>

A map from connection ID to metadata about that connection for all active connections.

read_capability: HashMap<GlobalId, ReadCapability<Timestamp>>

For each identifier, its read policy and any transaction holds on time.

Transactions should introduce and remove constraints through the methods acquire_read_holds and release_read_holds, respectively. The base policy can also be updated, though one should be sure to communicate this to the controller for it to have an effect.

txn_reads: HashMap<u32, TxnReads>

For each transaction, the pinned storage and compute identifiers and time at which they are pinned.

Upon completing a transaction, this timestamp should be removed from the holds in self.read_capability[id], using the release_read_holds method.

pending_peeks: HashMap<Uuid, PendingPeek>

A map from pending peek ids to the queue into which responses are sent, and the connection id of the client that initiated the peek.

client_pending_peeks: HashMap<u32, BTreeMap<Uuid, ComputeInstanceId>>

A map from client connection ids to a set of all pending peeks for that client

pending_tails: HashMap<GlobalId, PendingTail>

A map from pending tails to the tail description.

write_lock: Arc<Mutex<()>>

Serializes accesses to write critical sections.

write_lock_wait_group: VecDeque<Deferred>

Holds plans deferred due to write lock.

pending_writes: Vec<PendingWriteTxn>

Pending writes waiting for a group commit

secrets_controller: Box<dyn SecretsController>

Handle to secret manager that can create and delete secrets from an arbitrary secret storage engine.

replica_sizes: ClusterReplicaSizeMap

Map of strings to corresponding compute replica sizes.

availability_zones: Vec<String>

Valid availability zones for replicas.

connection_context: ConnectionContext

Extra context to pass through to connection creation.

transient_replica_metadata: HashMap<ReplicaId, Option<ReplicaMetadata>>

Metadata about replicas that doesn’t need to be persisted. Intended for inclusion in system tables.

None is used as a tombstone value for replicas that have been dropped and for which no further updates should be recorded.

Implementations

Creates a new dataflow builder from the catalog and indexes in self.

Creates a new index oracle for the specified compute instance.

Implements a peek plan produced by create_plan above.

Acquire read holds on the indicated collections at the indicated time.

This method will panic if the holds cannot be acquired. In the future, it would be polite to have it error instead, as it is not unrecoverable.

Release read holds on the indicated collections at the indicated time.

This method relies on a previous call to acquire_read_holds with the same argument, and its behavior will be erratic if called on anything else, or if called more than once on the same bundle of read holds.

Assign a timestamp for a read from a local input. Reads following writes must be at a time >= the write’s timestamp; we choose “equal to” for simplicity’s sake and to open as few new timestamps as possible.

Assign a timestamp for creating a source. Writes following reads must ensure that they are assigned a strictly larger timestamp to ensure they are not visible to any real-time earlier reads.

Assign a timestamp for a write to a local input and increase the local ts. Writes following reads must ensure that they are assigned a strictly larger timestamp to ensure they are not visible to any real-time earlier reads.

Peek the current timestamp used for operations on local inputs. Used to determine how much to block group commits by.

NOTE: This can be removed once DDL is included in group commits.

Initialize the storage read policies.

This should be called only after a storage collection is created, and ideally very soon afterwards. The collection is otherwise initialized with a read policy that allows no compaction.

Initialize the compute read policies.

This should be called only after a compute collection is created, and ideally very soon afterwards. The collection is otherwise initialized with a read policy that allows no compaction.

Initializes coordinator state based on the contained catalog. Must be called after creating the coordinator and before calling the Coordinator::serve method.

Serves the coordinator, receiving commands from users over cmd_rx and feedback from dataflow workers over feedback_rx.

You must call bootstrap before calling this method.

Attempts to commit all pending write transactions in a group commit. If the timestamp chosen for the writes is not ahead of now(), then we can execute and commit the writes immediately. Otherwise we must wait for now() to advance past the timestamp chosen for the writes.

Commits all pending write transactions at the same timestamp. All pending writes will be combined into a single Append command and sent to STORAGE as a single batch. All writes will happen at the same timestamp and all involved tables will be advanced to some timestamp larger than the timestamp of the write.

Submit a write to be executed during the next group commit.

Remove all pending peeks that were initiated by conn_id.

Verify a prepared statement is still valid.

Verify a portal is still valid.

Handles an execute command.

Instruct the dataflow layer to cancel any ongoing, interactive work for the named conn_id.

Handle termination of a client session.

This cleans up any state in the coordinator associated with the session.

Handle removing in-progress transaction state regardless of the end action of the transaction.

Removes all temporary items created by the specified connection, though not the temporary schema itself.

Return the set of ids in a timedomain and verify timeline correctness.

When a user starts a transaction, we need to prevent compaction of anything they might read from. We use a heuristic of “anything in the same database schemas with the same timeline as whatever the first query is”.

Sequence a peek, determining a timestamp and the most efficient dataflow interaction.

Peeks are sequenced by assigning a timestamp for evaluation, and then determining and deploying the most efficient evaluation plan. The peek could evaluate to a constant, be a simple read out of an existing arrangement, or required a new dataflow to build the results to return.

The smallest common valid read frontier among the specified collections.

The smallest common valid write frontier among the specified collections.

Times that are not greater or equal to this frontier are complete for all collections identified as arguments.

Determines the timestamp for a query.

Timestamp determination may fail due to the restricted validity of traces. Each has a since and upper frontier, and are only valid after since and sure to be available not after upper.

The set of storage and compute IDs used when determining the timestamp are also returned.

Perform a catalog transaction. The closure is passed a CatalogTxn made from the prospective CatalogState (i.e., the Catalog with ops applied but before the transaction is committed). The closure can return an error to abort the transaction, or otherwise return a value that is returned by this function. This allows callers to error while building DataflowDescs. Coordinator::ship_dataflow must be called after this function successfully returns on any built DataflowDesc.

Finalizes a dataflow and then broadcasts it to all workers. Utility method for the more general Self::ship_dataflows

Finalizes a list of dataflows and then broadcasts it to all workers.

Finalizes a dataflow.

Finalization includes optimization, but also validation of various invariants such as ensuring that the as_of frontier is in advance of the various since frontiers of participating data inputs.

In particular, there are requirement on the as_of field for the dataflow and the since frontiers of created arrangements, as a function of the since frontiers of dataflow inputs (sources and imported arrangements).

Panics

Panics if as_of is < the since frontiers.

Panics if the dataflow descriptions contain an invalid plan.

Return an error if the ids are from incompatible timelines. This should be used to prevent users from doing things that are either meaningless (joining data from timelines that have similar numbers with different meanings like two separate debezium topics) or will never complete (joining cdcv2 and realtime data).

Attempts to immediately grant session access to the write lock or errors if the lock is currently held.

Defers executing deferred until the write lock becomes available; waiting occurs in a green-thread, so callers of this function likely want to return after calling it.

Auto Trait Implementations

Blanket Implementations

Gets the TypeId of self. Read more

Immutably borrows from an owned value. Read more

Mutably borrows from an owned value. Read more

Returns the argument unchanged.

Attaches the provided Context to this type, returning a WithContext wrapper. Read more

Attaches the current Context to this type, returning a WithContext wrapper. Read more

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more

Instruments this type with the current Span, returning an Instrumented wrapper. Read more

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Wrap the input message T in a tonic::Request

Should always be Self

The type returned in the event of a conversion error.

Performs the conversion.

The type returned in the event of a conversion error.

Performs the conversion.

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more