Crate coordtest[][src]

Expand description

coordtest attempts to be a deterministic Coordinator tester using datadriven test files. Many parts of dataflow run in their own tokio task and we aren’t able to observe their output until we bump a timestamp and see if new data are available. For example, the FILE source on mac polls every 100ms, making writing a fully deterministic test difficult. Instead coordtest attempts to provide enough knobs that it can test interesting cases without race conditions.

The coordinator is started with logical compaction disabled. If you want to add it to an index (to allow sinces to advance past 0), use ALTER INDEX.

The following datadriven directives are supported:

  • sql: Executes the SQL using transaction rules similar to the simple pgwire protocol in a new session (that is, multiple sql directives are not in the same session). Output is formatted ExecuteResponse. The input can contain the string <TEMP> which will be replaced with a temporary directory.
  • wait-sql: Executes all SQL in a retry loop (with 5s timeout which will panic) until all datums returned (all columns in all rows in all statements) are true. Prior to each attempt, all pending feedback messages from the dataflow server are sent to the Coordinator. Messages for specified items can be skipped (but requeued) by specifying exclude-uppers=database.schema.item as an argument. After each failed attempt, the timestamp is incremented by 1 to give any new data an opportunity to be observed.
  • async-sql: Requires a session=name argument. Creates a named session, and executes the provided statements similarly to sql, except that the results are not immediately returned. Instead, await the results using the await-sql directive naming this session. The input can contain the string <TEMP> which will be replaced with a temporary directory when returned with await-sql. No output.
  • async-cancel: Requires a session=name argument. Cancels the named session. No output.
  • await-sql: Requires a session=name argument. Awaits the results of the named session. Output is formatted ExecuteResponse. If the input to the awaited session contained the string <TEMP>, it will be replaced with a temporary directory.
  • update-upper: Sends a batch of FrontierUppers to the Coordinator. Input is one update per line of the format database.schema.item N where N is some numeric timestamp. No output.
  • inc-timestamp: Increments the timestamp by number in the input. No output.
  • create-file: Requires a name=filename argument. Creates and truncates a file with the specified name in the TEMP directory with contents as the input. No output.
  • append-file: Same as create-file, but appends.
  • print-catalog: Outputs the catalog. Generally for debugging.


CoordTest works by creating a Coordinator with mechanisms to control when it receives messages. The dataflow server is started with a single worker, but it’s feedback channel is controlled by the InterceptingDataflowClient, allowing us to control when uppers and sinces advance.

A dataflow_types::client::Client implementation that intercepts responses from the dataflow server.