1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#![deny(missing_docs)]
use dataflow_types::client::{partitioned::Partitioned, Client, Command, Response};
pub struct RemoteClient {
client: Partitioned<tcp::TcpClient>,
}
impl RemoteClient {
pub async fn connect(addrs: &[impl tokio::net::ToSocketAddrs]) -> Result<Self, anyhow::Error> {
let mut remotes = Vec::with_capacity(addrs.len());
for addr in addrs.iter() {
remotes.push(tcp::TcpClient::connect(addr).await?);
}
Ok(Self {
client: Partitioned::new(remotes),
})
}
}
#[async_trait::async_trait]
impl Client for RemoteClient {
async fn send(&mut self, cmd: Command) {
tracing::trace!("Broadcasting dataflow command: {:?}", cmd);
self.client.send(cmd).await
}
async fn recv(&mut self) -> Option<Response> {
self.client.recv().await
}
}
pub mod tcp {
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpStream, ToSocketAddrs};
use tokio_serde::formats::Bincode;
use tokio_util::codec::LengthDelimitedCodec;
use dataflow_types::client::{Command, Response};
pub struct TcpClient {
connection: FramedClient<TcpStream>,
}
impl TcpClient {
pub async fn connect(addr: impl ToSocketAddrs) -> Result<TcpClient, anyhow::Error> {
let connection = framed_client(TcpStream::connect(addr).await?);
Ok(Self { connection })
}
}
#[async_trait::async_trait]
impl dataflow_types::client::Client for TcpClient {
async fn send(&mut self, cmd: dataflow_types::client::Command) {
self.connection
.send(cmd)
.await
.expect("worker command receiver should not drop first");
}
async fn recv(&mut self) -> Option<dataflow_types::client::Response> {
self.connection
.next()
.await
.map(|x| x.expect("connection to dataflow server broken"))
}
}
pub type Framed<C, T, U> = tokio_serde::Framed<
tokio_util::codec::Framed<C, LengthDelimitedCodec>,
T,
U,
Bincode<T, U>,
>;
pub type FramedServer<C> = Framed<C, Command, Response>;
pub type FramedClient<C> = Framed<C, Response, Command>;
fn length_delimited_codec() -> LengthDelimitedCodec {
let mut codec = LengthDelimitedCodec::new();
codec.set_max_frame_length(usize::MAX);
codec
}
pub fn framed_server<C>(conn: C) -> FramedServer<C>
where
C: AsyncRead + AsyncWrite,
{
tokio_serde::Framed::new(
tokio_util::codec::Framed::new(conn, length_delimited_codec()),
Bincode::default(),
)
}
pub fn framed_client<C>(conn: C) -> FramedClient<C>
where
C: AsyncRead + AsyncWrite,
{
tokio_serde::Framed::new(
tokio_util::codec::Framed::new(conn, length_delimited_codec()),
Bincode::default(),
)
}
}