Expand description

Worker-local state for storage timely instances.

One instance of a Worker, along with its contained StorageState, is part of an ensemble of storage workers that all run inside the same timely cluster. We call this worker a storage worker to disambiguate it from other kinds of workers, potentially other components that might be sharing the same timely cluster.

§Controller and internal communication

A worker receives external StorageCommands from the storage controller, via a channel. Storage workers also share an internal control/command fabric (internal_control). Internal commands go through a Sequencer dataflow that ensures that all workers receive all commands in the same consistent order.

We need to make sure that commands that cause dataflows to be rendered are processed in the same consistent order across all workers because timely requires this. To achieve this, we make sure that only internal commands can cause dataflows to be rendered. External commands (from the controller) cause internal commands to be broadcast (by only one worker), to get dataflows rendered.

The internal command fabric is also used to broadcast messages from a local operator/worker to all workers. For example, when we need to tear down and restart a dataflow on all workers when an error is encountered.

§Async Storage Worker

The storage worker has a companion AsyncStorageWorker that must be used when running code that requires async. This is needed because a timely main loop cannot run async code.

§Example flow of commands for RunIngestions

With external commands, internal commands, and the async worker, understanding where and how commands from the controller are realized can get complicated. We will follow the complete flow for RunIngestions, as an example:

  1. Worker receives a StorageCommand::RunIngestions command from the controller.
  2. This command is processed in StorageState::handle_storage_command. This step cannot render dataflows, because it does not have access to the timely worker. It will only set up state that stays over the whole lifetime of the source, such as the reported_frontier. Putting in place this reported frontier will enable frontier reporting for that source. We will not start reporting when we only see an internal command for rendering a dataflow, which can “overtake” the external RunIngestions command.
  3. During processing of that command, we call AsyncStorageWorker::update_frontiers, which causes a command to be sent to the async worker.
  4. We eventually get a response from the async worker: AsyncStorageWorkerResponse::FrontiersUpdated.
  5. This response is handled in Worker::handle_async_worker_response.
  6. Handling that response causes a InternalStorageCommand::CreateIngestionDataflow to be broadcast to all workers via the internal command fabric.
  7. This message will be processed (on each worker) in Worker::handle_internal_storage_command. This is what will cause the required dataflow to be rendered on all workers.

The process described above assumes that the RunIngestions is not an update, i.e. it is in response to a CREATE SOURCE-like statement.

The primary distinction when handling a RunIngestions that represents an update, is that it might fill out new internal state in the mid-level clients on the way toward being run.

Modules§

  • A friendly companion async worker that can be used by a timely storage worker to do work that requires async.

Structs§

  • This maintains an additional read hold on the source data for a sink, alongside the controller’s hold and the handle used to read the shard internally. This is useful because environmentd’s hold might expire, and the handle we use to read advances ahead of what we’ve successfully committed. In theory this could be stored alongside the other sink data, but this isn’t intended to be a long term solution; either this should become a “critical” handle or environmentd will learn to hold its handles across restarts and this won’t be needed.
  • Extra context for a storage instance. This is extra information that is used when rendering source and sinks that is not tied to the source/connection configuration itself.
  • Worker-local state related to the ingress or egress of collections of data.
  • State maintained for each worker thread.