Expand description
Worker-local state for storage timely instances.
One instance of a Worker, along with its contained StorageState, is
part of an ensemble of storage workers that all run inside the same timely
cluster. We call this worker a storage worker to disambiguate it from
other kinds of workers, potentially other components that might be sharing
the same timely cluster.
§Controller and internal communication
A worker receives external StorageCommands from the
storage controller, via a channel. Storage workers also share an internal
control/command fabric (internal_control). Internal commands go through
a sequencer dataflow that ensures that all workers receive all commands in
the same consistent order.
We need to make sure that commands that cause dataflows to be rendered are processed in the same consistent order across all workers because timely requires this. To achieve this, we make sure that only internal commands can cause dataflows to be rendered. External commands (from the controller) cause internal commands to be broadcast (by only one worker), to get dataflows rendered.
The internal command fabric is also used to broadcast messages from a local operator/worker to all workers. For example, when we need to tear down and restart a dataflow on all workers when an error is encountered.
§Async Storage Worker
The storage worker has a companion AsyncStorageWorker that must be used
when running code that requires async. This is needed because a timely
main loop cannot run async code.
§Example flow of commands for RunIngestion
With external commands, internal commands, and the async worker,
understanding where and how commands from the controller are realized can
get complicated. We will follow the complete flow for RunIngestion, as an
example:
- Worker receives a StorageCommand::RunIngestioncommand from the controller.
- This command is processed in StorageState::handle_storage_command. This step cannot render dataflows, because it does not have access to the timely worker. It will only set up state that stays over the whole lifetime of the source, such as thereported_frontier. Putting in place this reported frontier will enable frontier reporting for that source. We will not start reporting when we only see an internal command for rendering a dataflow, which can “overtake” the externalRunIngestioncommand.
- During processing of that command, we call
AsyncStorageWorker::update_ingestion_frontiers, which causes a command to be sent to the async worker.
- We eventually get a response from the async worker:
AsyncStorageWorkerResponse::IngestionFrontiersUpdated.
- This response is handled in Worker::handle_async_worker_response.
- Handling that response causes a
InternalStorageCommand::CreateIngestionDataflowto be broadcast to all workers via the internal command fabric.
- This message will be processed (on each worker) in
Worker::handle_internal_storage_command. This is what will cause the required dataflow to be rendered on all workers.
The process described above assumes that the RunIngestion is not an
update, i.e. it is in response to a CREATE SOURCE-like statement.
The primary distinction when handling a RunIngestion that represents an
update, is that it might fill out new internal state in the mid-level
clients on the way toward being run.
Modules§
- async_storage_ worker 
- A friendly companion async worker that can be used by a timely storage worker to do work that requires async.
Structs§
- StorageInstance Context 
- Extra context for a storage instance. This is extra information that is used when rendering source and sinks that is not tied to the source/connection configuration itself.
- StorageState 
- Worker-local state related to the ingress or egress of collections of data.
- Worker
- State maintained for each worker thread.