async fn build_kafka(
    builder: KafkaSinkConnectionBuilder,
    connection_context: ConnectionContext
) -> Result<StorageSinkConnection, Error>