thrift/transport/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Types used to send and receive bytes over an I/O channel.
19//!
20//! The core types are the `TReadTransport`, `TWriteTransport` and the
21//! `TIoChannel` traits, through which `TInputProtocol` or
22//! `TOutputProtocol` can receive and send primitives over the wire. While
23//! `TInputProtocol` and `TOutputProtocol` instances deal with language primitives
24//! the types in this module understand only bytes.
25
26use std::io;
27use std::io::{Read, Write};
28use std::ops::{Deref, DerefMut};
29
30#[cfg(test)]
31macro_rules! assert_eq_transport_num_written_bytes {
32    ($transport:ident, $num_written_bytes:expr) => {{
33        assert_eq!($transport.channel.write_bytes().len(), $num_written_bytes);
34    }};
35}
36
37#[cfg(test)]
38macro_rules! assert_eq_transport_written_bytes {
39    ($transport:ident, $expected_bytes:ident) => {{
40        assert_eq!($transport.channel.write_bytes(), &$expected_bytes);
41    }};
42}
43
44mod buffered;
45mod framed;
46mod mem;
47mod socket;
48
49pub use self::buffered::{
50    TBufferedReadTransport, TBufferedReadTransportFactory, TBufferedWriteTransport,
51    TBufferedWriteTransportFactory,
52};
53pub use self::framed::{
54    TFramedReadTransport, TFramedReadTransportFactory, TFramedWriteTransport,
55    TFramedWriteTransportFactory,
56};
57pub use self::mem::TBufferChannel;
58pub use self::socket::TTcpChannel;
59
60/// Identifies a transport used by a `TInputProtocol` to receive bytes.
61pub trait TReadTransport: Read {}
62
63/// Helper type used by a server to create `TReadTransport` instances for
64/// accepted client connections.
65pub trait TReadTransportFactory {
66    /// Create a `TTransport` that wraps a channel over which bytes are to be read.
67    fn create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send>;
68}
69
70/// Identifies a transport used by `TOutputProtocol` to send bytes.
71pub trait TWriteTransport: Write {}
72
73/// Helper type used by a server to create `TWriteTransport` instances for
74/// accepted client connections.
75pub trait TWriteTransportFactory {
76    /// Create a `TTransport` that wraps a channel over which bytes are to be sent.
77    fn create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send>;
78}
79
80impl<T> TReadTransport for T where T: Read {}
81
82impl<T> TWriteTransport for T where T: Write {}
83
84// FIXME: implement the Debug trait for boxed transports
85
86impl<T> TReadTransportFactory for Box<T>
87where
88    T: TReadTransportFactory + ?Sized,
89{
90    fn create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send> {
91        (**self).create(channel)
92    }
93}
94
95impl<T> TWriteTransportFactory for Box<T>
96where
97    T: TWriteTransportFactory + ?Sized,
98{
99    fn create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send> {
100        (**self).create(channel)
101    }
102}
103
104/// Identifies a splittable bidirectional I/O channel used to send and receive bytes.
105pub trait TIoChannel: Read + Write {
106    /// Split the channel into a readable half and a writable half, where the
107    /// readable half implements `io::Read` and the writable half implements
108    /// `io::Write`. Returns `None` if the channel was not initialized, or if it
109    /// cannot be split safely.
110    ///
111    /// Returned halves may share the underlying OS channel or buffer resources.
112    /// Implementations **should ensure** that these two halves can be safely
113    /// used independently by concurrent threads.
114    fn split(
115        self,
116    ) -> crate::Result<(
117        crate::transport::ReadHalf<Self>,
118        crate::transport::WriteHalf<Self>,
119    )>
120    where
121        Self: Sized;
122}
123
124/// The readable half of an object returned from `TIoChannel::split`.
125#[derive(Debug)]
126pub struct ReadHalf<C>
127where
128    C: Read,
129{
130    handle: C,
131}
132
133/// The writable half of an object returned from `TIoChannel::split`.
134#[derive(Debug)]
135pub struct WriteHalf<C>
136where
137    C: Write,
138{
139    handle: C,
140}
141
142impl<C> ReadHalf<C>
143where
144    C: Read,
145{
146    /// Create a `ReadHalf` associated with readable `handle`
147    pub fn new(handle: C) -> ReadHalf<C> {
148        ReadHalf { handle }
149    }
150}
151
152impl<C> WriteHalf<C>
153where
154    C: Write,
155{
156    /// Create a `WriteHalf` associated with writable `handle`
157    pub fn new(handle: C) -> WriteHalf<C> {
158        WriteHalf { handle }
159    }
160}
161
162impl<C> Read for ReadHalf<C>
163where
164    C: Read,
165{
166    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
167        self.handle.read(buf)
168    }
169}
170
171impl<C> Write for WriteHalf<C>
172where
173    C: Write,
174{
175    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
176        self.handle.write(buf)
177    }
178
179    fn flush(&mut self) -> io::Result<()> {
180        self.handle.flush()
181    }
182}
183
184impl<C> Deref for ReadHalf<C>
185where
186    C: Read,
187{
188    type Target = C;
189
190    fn deref(&self) -> &Self::Target {
191        &self.handle
192    }
193}
194
195impl<C> DerefMut for ReadHalf<C>
196where
197    C: Read,
198{
199    fn deref_mut(&mut self) -> &mut C {
200        &mut self.handle
201    }
202}
203
204impl<C> Deref for WriteHalf<C>
205where
206    C: Write,
207{
208    type Target = C;
209
210    fn deref(&self) -> &Self::Target {
211        &self.handle
212    }
213}
214
215impl<C> DerefMut for WriteHalf<C>
216where
217    C: Write,
218{
219    fn deref_mut(&mut self) -> &mut C {
220        &mut self.handle
221    }
222}
223
224#[cfg(test)]
225mod tests {
226
227    use std::io::Cursor;
228
229    use super::*;
230
231    #[test]
232    fn must_create_usable_read_channel_from_concrete_read_type() {
233        let r = Cursor::new([0, 1, 2]);
234        let _ = TBufferedReadTransport::new(r);
235    }
236
237    #[test]
238    fn must_create_usable_read_channel_from_boxed_read() {
239        let r: Box<dyn Read> = Box::new(Cursor::new([0, 1, 2]));
240        let _ = TBufferedReadTransport::new(r);
241    }
242
243    #[test]
244    fn must_create_usable_write_channel_from_concrete_write_type() {
245        let w = vec![0u8; 10];
246        let _ = TBufferedWriteTransport::new(w);
247    }
248
249    #[test]
250    fn must_create_usable_write_channel_from_boxed_write() {
251        let w: Box<dyn Write> = Box::new(vec![0u8; 10]);
252        let _ = TBufferedWriteTransport::new(w);
253    }
254
255    #[test]
256    fn must_create_usable_read_transport_from_concrete_read_transport() {
257        let r = Cursor::new([0, 1, 2]);
258        let mut t = TBufferedReadTransport::new(r);
259        takes_read_transport(&mut t)
260    }
261
262    #[test]
263    fn must_create_usable_read_transport_from_boxed_read() {
264        let r = Cursor::new([0, 1, 2]);
265        let mut t: Box<dyn TReadTransport> = Box::new(TBufferedReadTransport::new(r));
266        takes_read_transport(&mut t)
267    }
268
269    #[test]
270    fn must_create_usable_write_transport_from_concrete_write_transport() {
271        let w = vec![0u8; 10];
272        let mut t = TBufferedWriteTransport::new(w);
273        takes_write_transport(&mut t)
274    }
275
276    #[test]
277    fn must_create_usable_write_transport_from_boxed_write() {
278        let w = vec![0u8; 10];
279        let mut t: Box<dyn TWriteTransport> = Box::new(TBufferedWriteTransport::new(w));
280        takes_write_transport(&mut t)
281    }
282
283    fn takes_read_transport<R>(t: &mut R)
284    where
285        R: TReadTransport,
286    {
287        t.bytes();
288    }
289
290    fn takes_write_transport<W>(t: &mut W)
291    where
292        W: TWriteTransport,
293    {
294        t.flush().unwrap();
295    }
296}