Struct mz_coord::coord::Coordinator
source · [−]pub struct Coordinator<S> {Show 22 fields
dataflow_client: Controller,
view_optimizer: Optimizer,
catalog: Catalog<S>,
logical_compaction_window_ms: Option<Timestamp>,
internal_cmd_tx: UnboundedSender<Message>,
global_timeline: TimestampOracle<Timestamp>,
advance_tables: AdvanceTables<Timestamp>,
transient_id_counter: u64,
active_conns: HashMap<u32, ConnMeta>,
read_capability: HashMap<GlobalId, ReadCapability<Timestamp>>,
txn_reads: HashMap<u32, TxnReads>,
pending_peeks: HashMap<Uuid, PendingPeek>,
client_pending_peeks: HashMap<u32, BTreeMap<Uuid, ComputeInstanceId>>,
pending_tails: HashMap<GlobalId, PendingTail>,
write_lock: Arc<Mutex<()>>,
write_lock_wait_group: VecDeque<Deferred>,
pending_writes: Vec<PendingWriteTxn>,
secrets_controller: Box<dyn SecretsController>,
replica_sizes: ClusterReplicaSizeMap,
availability_zones: Vec<String>,
connection_context: ConnectionContext,
transient_replica_metadata: HashMap<ReplicaId, Option<ReplicaMetadata>>,
}
Expand description
Glues the external world to the Timely workers.
Fields
dataflow_client: Controller
A client to a running dataflow cluster.
This component offers:
- Sufficient isolation from COMPUTE, so long as communication with COMPUTE replicas is non-blocking.
- Insufficient isolation from STORAGE. The ADAPTER cannot tolerate failure of STORAGE services.
view_optimizer: Optimizer
Optimizer instance for logical optimization of views.
catalog: Catalog<S>
logical_compaction_window_ms: Option<Timestamp>
Delta from leading edge of an arrangement from which we allow compaction.
internal_cmd_tx: UnboundedSender<Message>
Channel to manage internal commands from the coordinator to itself.
global_timeline: TimestampOracle<Timestamp>
Mechanism for totally ordering write and read timestamps, so that all reads reflect exactly the set of writes that precede them, and no writes that follow.
advance_tables: AdvanceTables<Timestamp>
Tracks tables needing advancement, which can be processed at a low priority in the biased select loop.
transient_id_counter: u64
active_conns: HashMap<u32, ConnMeta>
A map from connection ID to metadata about that connection for all active connections.
read_capability: HashMap<GlobalId, ReadCapability<Timestamp>>
For each identifier, its read policy and any transaction holds on time.
Transactions should introduce and remove constraints through the methods
acquire_read_holds
and release_read_holds
, respectively. The base
policy can also be updated, though one should be sure to communicate this
to the controller for it to have an effect.
txn_reads: HashMap<u32, TxnReads>
For each transaction, the pinned storage and compute identifiers and time at which they are pinned.
Upon completing a transaction, this timestamp should be removed from the holds
in self.read_capability[id]
, using the release_read_holds
method.
pending_peeks: HashMap<Uuid, PendingPeek>
A map from pending peek ids to the queue into which responses are sent, and the connection id of the client that initiated the peek.
client_pending_peeks: HashMap<u32, BTreeMap<Uuid, ComputeInstanceId>>
A map from client connection ids to a set of all pending peeks for that client
pending_tails: HashMap<GlobalId, PendingTail>
A map from pending tails to the tail description.
write_lock: Arc<Mutex<()>>
Serializes accesses to write critical sections.
write_lock_wait_group: VecDeque<Deferred>
Holds plans deferred due to write lock.
pending_writes: Vec<PendingWriteTxn>
Pending writes waiting for a group commit
secrets_controller: Box<dyn SecretsController>
Handle to secret manager that can create and delete secrets from an arbitrary secret storage engine.
replica_sizes: ClusterReplicaSizeMap
Map of strings to corresponding compute replica sizes.
availability_zones: Vec<String>
Valid availability zones for replicas.
connection_context: ConnectionContext
Extra context to pass through to connection creation.
transient_replica_metadata: HashMap<ReplicaId, Option<ReplicaMetadata>>
Metadata about replicas that doesn’t need to be persisted. Intended for inclusion in system tables.
None
is used as a tombstone value for replicas that have been
dropped and for which no further updates should be recorded.
Implementations
sourceimpl<S: Append> Coordinator<S>
impl<S: Append> Coordinator<S>
sourcepub fn dataflow_builder(
&self,
instance: ComputeInstanceId
) -> DataflowBuilder<'_, Timestamp>
pub fn dataflow_builder(
&self,
instance: ComputeInstanceId
) -> DataflowBuilder<'_, Timestamp>
Creates a new dataflow builder from the catalog and indexes in self
.
sourceimpl<S: Append> Coordinator<S>
impl<S: Append> Coordinator<S>
sourcepub fn index_oracle(
&self,
instance: ComputeInstanceId
) -> ComputeInstanceIndexOracle<'_, Timestamp>
pub fn index_oracle(
&self,
instance: ComputeInstanceId
) -> ComputeInstanceIndexOracle<'_, Timestamp>
Creates a new index oracle for the specified compute instance.
sourceimpl<S: Append + 'static> Coordinator<S>
impl<S: Append + 'static> Coordinator<S>
sourcepub async fn implement_fast_path_peek(
&mut self,
fast_path: Plan,
timestamp: Timestamp,
finishing: RowSetFinishing,
conn_id: u32,
source_arity: usize,
compute_instance: ComputeInstanceId,
target_replica: Option<ReplicaId>
) -> Result<ExecuteResponse, CoordError>
pub async fn implement_fast_path_peek(
&mut self,
fast_path: Plan,
timestamp: Timestamp,
finishing: RowSetFinishing,
conn_id: u32,
source_arity: usize,
compute_instance: ComputeInstanceId,
target_replica: Option<ReplicaId>
) -> Result<ExecuteResponse, CoordError>
Implements a peek plan produced by create_plan
above.
sourceimpl<S> Coordinator<S>
impl<S> Coordinator<S>
sourcepub(super) async fn acquire_read_holds(
&mut self,
read_holds: &ReadHolds<Timestamp>
)
pub(super) async fn acquire_read_holds(
&mut self,
read_holds: &ReadHolds<Timestamp>
)
Acquire read holds on the indicated collections at the indicated time.
This method will panic if the holds cannot be acquired. In the future, it would be polite to have it error instead, as it is not unrecoverable.
sourcepub(super) async fn release_read_hold(
&mut self,
read_holds: ReadHolds<Timestamp>
)
pub(super) async fn release_read_hold(
&mut self,
read_holds: ReadHolds<Timestamp>
)
Release read holds on the indicated collections at the indicated time.
This method relies on a previous call to acquire_read_holds
with the same
argument, and its behavior will be erratic if called on anything else, or if
called more than once on the same bundle of read holds.
sourceimpl<S: Append + 'static> Coordinator<S>
impl<S: Append + 'static> Coordinator<S>
sourcefn get_local_read_ts(&mut self) -> Timestamp
fn get_local_read_ts(&mut self) -> Timestamp
Assign a timestamp for a read from a local input. Reads following writes must be at a time >= the write’s timestamp; we choose “equal to” for simplicity’s sake and to open as few new timestamps as possible.
sourcefn get_local_write_ts(&mut self) -> Timestamp
fn get_local_write_ts(&mut self) -> Timestamp
Assign a timestamp for creating a source. Writes following reads must ensure that they are assigned a strictly larger timestamp to ensure they are not visible to any real-time earlier reads.
sourcefn get_and_step_local_write_ts(&mut self) -> WriteTimestamp
fn get_and_step_local_write_ts(&mut self) -> WriteTimestamp
Assign a timestamp for a write to a local input and increase the local ts. Writes following reads must ensure that they are assigned a strictly larger timestamp to ensure they are not visible to any real-time earlier reads.
sourcefn peek_local_ts(&self) -> Timestamp
fn peek_local_ts(&self) -> Timestamp
Peek the current timestamp used for operations on local inputs. Used to determine how much to block group commits by.
NOTE: This can be removed once DDL is included in group commits.
fn local_fast_forward(&mut self, lower_bound: Timestamp)
fn now(&self) -> EpochMillis
fn now_datetime(&self) -> DateTime<Utc>
sourceasync fn initialize_storage_read_policies(
&mut self,
ids: Vec<GlobalId>,
compaction_window_ms: Option<Timestamp>
)
async fn initialize_storage_read_policies(
&mut self,
ids: Vec<GlobalId>,
compaction_window_ms: Option<Timestamp>
)
Initialize the storage read policies.
This should be called only after a storage collection is created, and ideally very soon afterwards. The collection is otherwise initialized with a read policy that allows no compaction.
sourceasync fn initialize_compute_read_policies(
&mut self,
ids: Vec<GlobalId>,
instance: ComputeInstanceId,
compaction_window_ms: Option<Timestamp>
)
async fn initialize_compute_read_policies(
&mut self,
ids: Vec<GlobalId>,
instance: ComputeInstanceId,
compaction_window_ms: Option<Timestamp>
)
Initialize the compute read policies.
This should be called only after a compute collection is created, and ideally very soon afterwards. The collection is otherwise initialized with a read policy that allows no compaction.
sourceasync fn bootstrap(
&mut self,
builtin_table_updates: Vec<BuiltinTableUpdate>
) -> Result<(), CoordError>
async fn bootstrap(
&mut self,
builtin_table_updates: Vec<BuiltinTableUpdate>
) -> Result<(), CoordError>
Initializes coordinator state based on the contained catalog. Must be
called after creating the coordinator and before calling the
Coordinator::serve
method.
sourceasync fn serve(
self,
internal_cmd_rx: UnboundedReceiver<Message>,
cmd_rx: UnboundedReceiver<Command>
)
async fn serve(
self,
internal_cmd_rx: UnboundedReceiver<Message>,
cmd_rx: UnboundedReceiver<Command>
)
Serves the coordinator, receiving commands from users over cmd_rx
and feedback from dataflow workers over feedback_rx
.
You must call bootstrap
before calling this method.
async fn advance_local_inputs(&mut self, advance_to: Timestamp)
async fn advance_local_input(&mut self, inputs: AdvanceLocalInput<Timestamp>)
sourceasync fn try_group_commit(&mut self)
async fn try_group_commit(&mut self)
Attempts to commit all pending write transactions in a group commit. If the timestamp
chosen for the writes is not ahead of now()
, then we can execute and commit the writes
immediately. Otherwise we must wait for now()
to advance past the timestamp chosen for the
writes.
sourceasync fn group_commit(&mut self)
async fn group_commit(&mut self)
Commits all pending write transactions at the same timestamp. All pending writes will be combined into a single Append command and sent to STORAGE as a single batch. All writes will happen at the same timestamp and all involved tables will be advanced to some timestamp larger than the timestamp of the write.
sourcefn submit_write(&mut self, pending_write_txn: PendingWriteTxn)
fn submit_write(&mut self, pending_write_txn: PendingWriteTxn)
Submit a write to be executed during the next group commit.
async fn message_controller(&mut self, message: ControllerResponse)
async fn message_create_source_statement_ready(
&mut self,
__arg1: CreateSourceStatementReady
)
async fn message_sink_connection_ready(&mut self, __arg1: SinkConnectionReady)
fn message_send_diffs(&mut self, _: SendDiffs)
sourceasync fn remove_pending_peeks(&mut self, conn_id: u32) -> Vec<PendingPeek>
async fn remove_pending_peeks(&mut self, conn_id: u32) -> Vec<PendingPeek>
Remove all pending peeks that were initiated by conn_id
.
async fn message_command(&mut self, cmd: Command)
async fn message_compute_instance_status(&mut self, event: ComputeInstanceEvent)
async fn handle_statement(
&mut self,
session: &mut Session,
stmt: Statement<Aug>,
params: &Params
) -> Result<Plan, CoordError>
fn handle_declare(
&self,
session: &mut Session,
name: String,
stmt: Statement<Raw>,
param_types: Vec<Option<ScalarType>>
) -> Result<(), CoordError>
fn handle_describe(
&self,
session: &mut Session,
name: String,
stmt: Option<Statement<Raw>>,
param_types: Vec<Option<ScalarType>>
) -> Result<(), CoordError>
fn describe(
&self,
session: &Session,
stmt: Option<Statement<Raw>>,
param_types: Vec<Option<ScalarType>>
) -> Result<StatementDesc, CoordError>
sourcefn verify_prepared_statement(
&self,
session: &mut Session,
name: &str
) -> Result<(), CoordError>
fn verify_prepared_statement(
&self,
session: &mut Session,
name: &str
) -> Result<(), CoordError>
Verify a prepared statement is still valid.
sourcefn verify_portal(
&self,
session: &mut Session,
name: &str
) -> Result<(), CoordError>
fn verify_portal(
&self,
session: &mut Session,
name: &str
) -> Result<(), CoordError>
Verify a portal is still valid.
fn verify_statement_revision(
&self,
session: &Session,
stmt: Option<&Statement<Raw>>,
desc: &StatementDesc,
catalog_revision: u64
) -> Result<Option<u64>, CoordError>
sourceasync fn handle_execute(
&mut self,
portal_name: String,
session: Session,
tx: ClientTransmitter<ExecuteResponse>
)
async fn handle_execute(
&mut self,
portal_name: String,
session: Session,
tx: ClientTransmitter<ExecuteResponse>
)
Handles an execute command.
async fn handle_execute_inner(
&mut self,
stmt: Statement<Raw>,
params: Params,
session: Session,
tx: ClientTransmitter<ExecuteResponse>
)
sourceasync fn handle_cancel(&mut self, conn_id: u32, secret_key: u32)
async fn handle_cancel(&mut self, conn_id: u32, secret_key: u32)
Instruct the dataflow layer to cancel any ongoing, interactive work for
the named conn_id
.
sourceasync fn handle_terminate(&mut self, session: &mut Session)
async fn handle_terminate(&mut self, session: &mut Session)
Handle termination of a client session.
This cleans up any state in the coordinator associated with the session.
sourceasync fn clear_transaction(
&mut self,
session: &mut Session
) -> TransactionStatus<Timestamp>
async fn clear_transaction(
&mut self,
session: &mut Session
) -> TransactionStatus<Timestamp>
Handle removing in-progress transaction state regardless of the end action of the transaction.
sourceasync fn drop_temp_items(&mut self, session: &Session)
async fn drop_temp_items(&mut self, session: &Session)
Removes all temporary items created by the specified connection, though not the temporary schema itself.
async fn handle_sink_connection_ready(
&mut self,
id: GlobalId,
oid: u32,
connection: SinkConnection,
compute_instance: ComputeInstanceId,
session: Option<&Session>
) -> Result<(), CoordError>
async fn sequence_plan(
&mut self,
tx: ClientTransmitter<ExecuteResponse>,
session: Session,
plan: Plan,
depends_on: Vec<GlobalId>
)
fn sequence_execute(
&mut self,
session: &mut Session,
plan: ExecutePlan
) -> Result<String, CoordError>
async fn sequence_create_connection(
&mut self,
session: &Session,
plan: CreateConnectionPlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_create_database(
&mut self,
session: &Session,
plan: CreateDatabasePlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_create_schema(
&mut self,
session: &Session,
plan: CreateSchemaPlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_create_role(
&mut self,
session: &Session,
plan: CreateRolePlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_create_compute_instance(
&mut self,
session: &Session,
__arg2: CreateComputeInstancePlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_create_compute_instance_replica(
&mut self,
session: &Session,
__arg2: CreateComputeInstanceReplicaPlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_create_secret(
&mut self,
session: &Session,
plan: CreateSecretPlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_create_table(
&mut self,
session: &Session,
plan: CreateTablePlan,
depends_on: Vec<GlobalId>
) -> Result<ExecuteResponse, CoordError>
async fn sequence_create_source(
&mut self,
session: &mut Session,
plan: CreateSourcePlan,
depends_on: Vec<GlobalId>
) -> Result<ExecuteResponse, CoordError>
async fn sequence_create_sink(
&mut self,
session: Session,
plan: CreateSinkPlan,
depends_on: Vec<GlobalId>,
tx: ClientTransmitter<ExecuteResponse>
)
async fn generate_view_ops(
&mut self,
session: &Session,
name: QualifiedObjectName,
view: View,
replace: Option<GlobalId>,
materialize: bool,
depends_on: Vec<GlobalId>
) -> Result<(Vec<Op>, Option<(GlobalId, ComputeInstanceId)>), CoordError>
async fn sequence_create_view(
&mut self,
session: &Session,
plan: CreateViewPlan,
depends_on: Vec<GlobalId>
) -> Result<ExecuteResponse, CoordError>
async fn sequence_create_views(
&mut self,
session: &mut Session,
plan: CreateViewsPlan,
depends_on: Vec<GlobalId>
) -> Result<ExecuteResponse, CoordError>
async fn sequence_create_recorded_view(
&mut self,
_session: &Session,
_plan: CreateRecordedViewPlan,
_depends_on: Vec<GlobalId>
) -> Result<ExecuteResponse, CoordError>
async fn sequence_create_index(
&mut self,
session: &Session,
plan: CreateIndexPlan,
depends_on: Vec<GlobalId>
) -> Result<ExecuteResponse, CoordError>
async fn sequence_create_type(
&mut self,
session: &Session,
plan: CreateTypePlan,
depends_on: Vec<GlobalId>
) -> Result<ExecuteResponse, CoordError>
async fn sequence_drop_database(
&mut self,
session: &Session,
plan: DropDatabasePlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_drop_schema(
&mut self,
session: &Session,
plan: DropSchemaPlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_drop_roles(
&mut self,
session: &Session,
plan: DropRolesPlan
) -> Result<ExecuteResponse, CoordError>
async fn drop_replica(
&mut self,
instance_id: ComputeInstanceId,
replica_id: ReplicaId,
replica_config: ConcreteComputeInstanceReplicaConfig
) -> Result<(), Error>
async fn sequence_drop_compute_instances(
&mut self,
session: &Session,
plan: DropComputeInstancesPlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_drop_compute_instance_replica(
&mut self,
session: &Session,
__arg2: DropComputeInstanceReplicaPlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_drop_items(
&mut self,
session: &Session,
plan: DropItemsPlan
) -> Result<ExecuteResponse, CoordError>
fn sequence_show_all_variables(
&mut self,
session: &Session
) -> Result<ExecuteResponse, CoordError>
fn sequence_show_variable(
&self,
session: &Session,
plan: ShowVariablePlan
) -> Result<ExecuteResponse, CoordError>
fn sequence_set_variable(
&self,
session: &mut Session,
plan: SetVariablePlan
) -> Result<ExecuteResponse, CoordError>
fn sequence_reset_variable(
&self,
session: &mut Session,
plan: ResetVariablePlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_end_transaction(
&mut self,
tx: ClientTransmitter<ExecuteResponse>,
session: Session,
action: EndTransactionAction
)
async fn sequence_end_transaction_inner(
&mut self,
session: &mut Session,
action: EndTransactionAction
) -> Result<(Option<Vec<WriteOp>>, Option<OwnedMutexGuard<()>>), CoordError>
sourcefn timedomain_for<'a, I>(
&self,
uses_ids: I,
timeline: &Option<Timeline>,
conn_id: u32,
compute_instance: ComputeInstanceId
) -> Result<CollectionIdBundle, CoordError> where
I: IntoIterator<Item = &'a GlobalId>,
fn timedomain_for<'a, I>(
&self,
uses_ids: I,
timeline: &Option<Timeline>,
conn_id: u32,
compute_instance: ComputeInstanceId
) -> Result<CollectionIdBundle, CoordError> where
I: IntoIterator<Item = &'a GlobalId>,
Return the set of ids in a timedomain and verify timeline correctness.
When a user starts a transaction, we need to prevent compaction of anything they might read from. We use a heuristic of “anything in the same database schemas with the same timeline as whatever the first query is”.
sourceasync fn sequence_peek(
&mut self,
session: &mut Session,
plan: PeekPlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_peek(
&mut self,
session: &mut Session,
plan: PeekPlan
) -> Result<ExecuteResponse, CoordError>
Sequence a peek, determining a timestamp and the most efficient dataflow interaction.
Peeks are sequenced by assigning a timestamp for evaluation, and then determining and deploying the most efficient evaluation plan. The peek could evaluate to a constant, be a simple read out of an existing arrangement, or required a new dataflow to build the results to return.
async fn sequence_tail(
&mut self,
session: &mut Session,
plan: TailPlan,
depends_on: Vec<GlobalId>
) -> Result<ExecuteResponse, CoordError>
sourcefn least_valid_read(
&self,
id_bundle: &CollectionIdBundle,
instance: ComputeInstanceId
) -> Antichain<Timestamp>
fn least_valid_read(
&self,
id_bundle: &CollectionIdBundle,
instance: ComputeInstanceId
) -> Antichain<Timestamp>
The smallest common valid read frontier among the specified collections.
sourcefn least_valid_write(
&self,
id_bundle: &CollectionIdBundle,
instance: ComputeInstanceId
) -> Antichain<Timestamp>
fn least_valid_write(
&self,
id_bundle: &CollectionIdBundle,
instance: ComputeInstanceId
) -> Antichain<Timestamp>
The smallest common valid write frontier among the specified collections.
Times that are not greater or equal to this frontier are complete for all collections identified as arguments.
sourcefn determine_timestamp(
&mut self,
session: &Session,
id_bundle: &CollectionIdBundle,
when: QueryWhen,
compute_instance: ComputeInstanceId
) -> Result<Timestamp, CoordError>
fn determine_timestamp(
&mut self,
session: &Session,
id_bundle: &CollectionIdBundle,
when: QueryWhen,
compute_instance: ComputeInstanceId
) -> Result<Timestamp, CoordError>
Determines the timestamp for a query.
Timestamp determination may fail due to the restricted validity of
traces. Each has a since
and upper
frontier, and are only valid
after since
and sure to be available not after upper
.
The set of storage and compute IDs used when determining the timestamp are also returned.
fn sequence_explain(
&mut self,
session: &Session,
plan: ExplainPlan
) -> Result<ExecuteResponse, CoordError>
fn sequence_explain_new(
&mut self,
_session: &Session,
_plan: ExplainPlanNew
) -> Result<ExecuteResponse, CoordError>
fn sequence_explain_old(
&mut self,
session: &Session,
plan: ExplainPlanOld
) -> Result<ExecuteResponse, CoordError>
fn sequence_send_diffs(
&mut self,
session: &mut Session,
plan: SendDiffsPlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_insert(
&mut self,
tx: ClientTransmitter<ExecuteResponse>,
session: Session,
plan: InsertPlan
)
fn sequence_insert_constant(
&mut self,
session: &mut Session,
id: GlobalId,
constants: MirRelationExpr
) -> Result<ExecuteResponse, CoordError>
fn sequence_copy_rows(
&mut self,
session: &mut Session,
id: GlobalId,
columns: Vec<usize>,
rows: Vec<Row>
) -> Result<ExecuteResponse, CoordError>
async fn sequence_read_then_write(
&mut self,
tx: ClientTransmitter<ExecuteResponse>,
session: Session,
plan: ReadThenWritePlan
)
async fn sequence_alter_item_rename(
&mut self,
session: &Session,
plan: AlterItemRenamePlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_alter_index_set_options(
&mut self,
plan: AlterIndexSetOptionsPlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_alter_index_reset_options(
&mut self,
plan: AlterIndexResetOptionsPlan
) -> Result<ExecuteResponse, CoordError>
async fn sequence_alter_secret(
&mut self,
session: &Session,
plan: AlterSecretPlan
) -> Result<ExecuteResponse, CoordError>
fn extract_secret(
&mut self,
session: &Session,
secret_as: &mut MirScalarExpr
) -> Result<Vec<u8>, CoordError>
sourceasync fn catalog_transact<F, R>(
&mut self,
session: Option<&Session>,
ops: Vec<Op>,
f: F
) -> Result<R, CoordError> where
F: FnOnce(CatalogTxn<'_, Timestamp>) -> Result<R, CoordError>,
async fn catalog_transact<F, R>(
&mut self,
session: Option<&Session>,
ops: Vec<Op>,
f: F
) -> Result<R, CoordError> where
F: FnOnce(CatalogTxn<'_, Timestamp>) -> Result<R, CoordError>,
Perform a catalog transaction. The closure is passed a CatalogTxn
made from the prospective CatalogState
(i.e., the Catalog
with ops
applied but before the transaction is committed). The closure can return
an error to abort the transaction, or otherwise return a value that is
returned by this function. This allows callers to error while building
DataflowDesc
s. Coordinator::ship_dataflow
must be called after this
function successfully returns on any built DataflowDesc
.
async fn send_builtin_table_updates(&mut self, updates: Vec<BuiltinTableUpdate>)
async fn drop_sinks(&mut self, sinks: Vec<(ComputeInstanceId, GlobalId)>)
async fn drop_indexes(&mut self, indexes: Vec<(ComputeInstanceId, GlobalId)>)
async fn set_index_options(
&mut self,
id: GlobalId,
options: Vec<IndexOption>
) -> Result<(), CoordError>
async fn drop_secrets(&mut self, secrets: Vec<GlobalId>)
sourceasync fn ship_dataflow(
&mut self,
dataflow: DataflowDesc,
instance: ComputeInstanceId
)
async fn ship_dataflow(
&mut self,
dataflow: DataflowDesc,
instance: ComputeInstanceId
)
Finalizes a dataflow and then broadcasts it to all workers. Utility method for the more general Self::ship_dataflows
sourceasync fn ship_dataflows(
&mut self,
dataflows: Vec<DataflowDesc>,
instance: ComputeInstanceId
)
async fn ship_dataflows(
&mut self,
dataflows: Vec<DataflowDesc>,
instance: ComputeInstanceId
)
Finalizes a list of dataflows and then broadcasts it to all workers.
sourcefn finalize_dataflow(
&self,
dataflow: DataflowDesc,
compute_instance: ComputeInstanceId
) -> DataflowDescription<Plan>
fn finalize_dataflow(
&self,
dataflow: DataflowDesc,
compute_instance: ComputeInstanceId
) -> DataflowDescription<Plan>
Finalizes a dataflow.
Finalization includes optimization, but also validation of various
invariants such as ensuring that the as_of
frontier is in advance of
the various since
frontiers of participating data inputs.
In particular, there are requirement on the as_of
field for the dataflow
and the since
frontiers of created arrangements, as a function of the since
frontiers of dataflow inputs (sources and imported arrangements).
Panics
Panics if as_of is < the since
frontiers.
Panics if the dataflow descriptions contain an invalid plan.
fn allocate_transient_id(&mut self) -> Result<GlobalId, CoordError>
sourcefn validate_timeline<I>(&self, ids: I) -> Result<Option<Timeline>, CoordError> where
I: IntoIterator<Item = GlobalId>,
fn validate_timeline<I>(&self, ids: I) -> Result<Option<Timeline>, CoordError> where
I: IntoIterator<Item = GlobalId>,
Return an error if the ids are from incompatible timelines. This should be used to prevent users from doing things that are either meaningless (joining data from timelines that have similar numbers with different meanings like two separate debezium topics) or will never complete (joining cdcv2 and realtime data).
sourcefn try_grant_session_write_lock(
&self,
session: &mut Session
) -> Result<(), TryLockError>
fn try_grant_session_write_lock(
&self,
session: &mut Session
) -> Result<(), TryLockError>
Attempts to immediately grant session
access to the write lock or
errors if the lock is currently held.
sourcefn defer_write(&mut self, deferred: Deferred)
fn defer_write(&mut self, deferred: Deferred)
Defers executing deferred
until the write lock becomes available; waiting
occurs in a green-thread, so callers of this function likely want to
return after calling it.
Auto Trait Implementations
impl<S> !RefUnwindSafe for Coordinator<S>
impl<S> !Send for Coordinator<S>
impl<S> !Sync for Coordinator<S>
impl<S> Unpin for Coordinator<S>
impl<S> !UnwindSafe for Coordinator<S>
Blanket Implementations
sourceimpl<T> BorrowMut<T> for T where
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized,
const: unstable · sourcefn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
sourceimpl<T> FutureExt for T
impl<T> FutureExt for T
sourcefn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
sourcefn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
sourceimpl<T> Instrument for T
impl<T> Instrument for T
sourcefn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
sourcefn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
sourceimpl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
sourcefn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
Wrap the input message T
in a tonic::Request
sourceimpl<P, R> ProtoType<R> for P where
R: RustType<P>,
impl<P, R> ProtoType<R> for P where
R: RustType<P>,
sourcefn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
See RustType::from_proto
.
sourcefn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
See RustType::into_proto
.
sourceimpl<T> WithSubscriber for T
impl<T> WithSubscriber for T
sourcefn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self> where
S: Into<Dispatch>,
fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self> where
S: Into<Dispatch>,
Attaches the provided Subscriber
to this type, returning a
WithDispatch
wrapper. Read more
sourcefn with_current_subscriber(self) -> WithDispatch<Self>
fn with_current_subscriber(self) -> WithDispatch<Self>
Attaches the current default Subscriber
to this type, returning a
WithDispatch
wrapper. Read more