1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
1718//! Types used to send and receive bytes over an I/O channel.
19//!
20//! The core types are the `TReadTransport`, `TWriteTransport` and the
21//! `TIoChannel` traits, through which `TInputProtocol` or
22//! `TOutputProtocol` can receive and send primitives over the wire. While
23//! `TInputProtocol` and `TOutputProtocol` instances deal with language primitives
24//! the types in this module understand only bytes.
2526use std::io;
27use std::io::{Read, Write};
28use std::ops::{Deref, DerefMut};
2930#[cfg(test)]
31macro_rules! assert_eq_transport_num_written_bytes {
32 ($transport:ident, $num_written_bytes:expr) => {{
33assert_eq!($transport.channel.write_bytes().len(), $num_written_bytes);
34 }};
35}
3637#[cfg(test)]
38macro_rules! assert_eq_transport_written_bytes {
39 ($transport:ident, $expected_bytes:ident) => {{
40assert_eq!($transport.channel.write_bytes(), &$expected_bytes);
41 }};
42}
4344mod buffered;
45mod framed;
46mod mem;
47mod socket;
4849pub use self::buffered::{
50 TBufferedReadTransport, TBufferedReadTransportFactory, TBufferedWriteTransport,
51 TBufferedWriteTransportFactory,
52};
53pub use self::framed::{
54 TFramedReadTransport, TFramedReadTransportFactory, TFramedWriteTransport,
55 TFramedWriteTransportFactory,
56};
57pub use self::mem::TBufferChannel;
58pub use self::socket::TTcpChannel;
5960/// Identifies a transport used by a `TInputProtocol` to receive bytes.
61pub trait TReadTransport: Read {}
6263/// Helper type used by a server to create `TReadTransport` instances for
64/// accepted client connections.
65pub trait TReadTransportFactory {
66/// Create a `TTransport` that wraps a channel over which bytes are to be read.
67fn create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send>;
68}
6970/// Identifies a transport used by `TOutputProtocol` to send bytes.
71pub trait TWriteTransport: Write {}
7273/// Helper type used by a server to create `TWriteTransport` instances for
74/// accepted client connections.
75pub trait TWriteTransportFactory {
76/// Create a `TTransport` that wraps a channel over which bytes are to be sent.
77fn create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send>;
78}
7980impl<T> TReadTransport for T where T: Read {}
8182impl<T> TWriteTransport for T where T: Write {}
8384// FIXME: implement the Debug trait for boxed transports
8586impl<T> TReadTransportFactory for Box<T>
87where
88T: TReadTransportFactory + ?Sized,
89{
90fn create(&self, channel: Box<dyn Read + Send>) -> Box<dyn TReadTransport + Send> {
91 (**self).create(channel)
92 }
93}
9495impl<T> TWriteTransportFactory for Box<T>
96where
97T: TWriteTransportFactory + ?Sized,
98{
99fn create(&self, channel: Box<dyn Write + Send>) -> Box<dyn TWriteTransport + Send> {
100 (**self).create(channel)
101 }
102}
103104/// Identifies a splittable bidirectional I/O channel used to send and receive bytes.
105pub trait TIoChannel: Read + Write {
106/// Split the channel into a readable half and a writable half, where the
107 /// readable half implements `io::Read` and the writable half implements
108 /// `io::Write`. Returns `None` if the channel was not initialized, or if it
109 /// cannot be split safely.
110 ///
111 /// Returned halves may share the underlying OS channel or buffer resources.
112 /// Implementations **should ensure** that these two halves can be safely
113 /// used independently by concurrent threads.
114fn split(
115self,
116 ) -> crate::Result<(
117crate::transport::ReadHalf<Self>,
118crate::transport::WriteHalf<Self>,
119 )>
120where
121Self: Sized;
122}
123124/// The readable half of an object returned from `TIoChannel::split`.
125#[derive(Debug)]
126pub struct ReadHalf<C>
127where
128C: Read,
129{
130 handle: C,
131}
132133/// The writable half of an object returned from `TIoChannel::split`.
134#[derive(Debug)]
135pub struct WriteHalf<C>
136where
137C: Write,
138{
139 handle: C,
140}
141142impl<C> ReadHalf<C>
143where
144C: Read,
145{
146/// Create a `ReadHalf` associated with readable `handle`
147pub fn new(handle: C) -> ReadHalf<C> {
148 ReadHalf { handle }
149 }
150}
151152impl<C> WriteHalf<C>
153where
154C: Write,
155{
156/// Create a `WriteHalf` associated with writable `handle`
157pub fn new(handle: C) -> WriteHalf<C> {
158 WriteHalf { handle }
159 }
160}
161162impl<C> Read for ReadHalf<C>
163where
164C: Read,
165{
166fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
167self.handle.read(buf)
168 }
169}
170171impl<C> Write for WriteHalf<C>
172where
173C: Write,
174{
175fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
176self.handle.write(buf)
177 }
178179fn flush(&mut self) -> io::Result<()> {
180self.handle.flush()
181 }
182}
183184impl<C> Deref for ReadHalf<C>
185where
186C: Read,
187{
188type Target = C;
189190fn deref(&self) -> &Self::Target {
191&self.handle
192 }
193}
194195impl<C> DerefMut for ReadHalf<C>
196where
197C: Read,
198{
199fn deref_mut(&mut self) -> &mut C {
200&mut self.handle
201 }
202}
203204impl<C> Deref for WriteHalf<C>
205where
206C: Write,
207{
208type Target = C;
209210fn deref(&self) -> &Self::Target {
211&self.handle
212 }
213}
214215impl<C> DerefMut for WriteHalf<C>
216where
217C: Write,
218{
219fn deref_mut(&mut self) -> &mut C {
220&mut self.handle
221 }
222}
223224#[cfg(test)]
225mod tests {
226227use std::io::Cursor;
228229use super::*;
230231#[test]
232fn must_create_usable_read_channel_from_concrete_read_type() {
233let r = Cursor::new([0, 1, 2]);
234let _ = TBufferedReadTransport::new(r);
235 }
236237#[test]
238fn must_create_usable_read_channel_from_boxed_read() {
239let r: Box<dyn Read> = Box::new(Cursor::new([0, 1, 2]));
240let _ = TBufferedReadTransport::new(r);
241 }
242243#[test]
244fn must_create_usable_write_channel_from_concrete_write_type() {
245let w = vec![0u8; 10];
246let _ = TBufferedWriteTransport::new(w);
247 }
248249#[test]
250fn must_create_usable_write_channel_from_boxed_write() {
251let w: Box<dyn Write> = Box::new(vec![0u8; 10]);
252let _ = TBufferedWriteTransport::new(w);
253 }
254255#[test]
256fn must_create_usable_read_transport_from_concrete_read_transport() {
257let r = Cursor::new([0, 1, 2]);
258let mut t = TBufferedReadTransport::new(r);
259 takes_read_transport(&mut t)
260 }
261262#[test]
263fn must_create_usable_read_transport_from_boxed_read() {
264let r = Cursor::new([0, 1, 2]);
265let mut t: Box<dyn TReadTransport> = Box::new(TBufferedReadTransport::new(r));
266 takes_read_transport(&mut t)
267 }
268269#[test]
270fn must_create_usable_write_transport_from_concrete_write_transport() {
271let w = vec![0u8; 10];
272let mut t = TBufferedWriteTransport::new(w);
273 takes_write_transport(&mut t)
274 }
275276#[test]
277fn must_create_usable_write_transport_from_boxed_write() {
278let w = vec![0u8; 10];
279let mut t: Box<dyn TWriteTransport> = Box::new(TBufferedWriteTransport::new(w));
280 takes_write_transport(&mut t)
281 }
282283fn takes_read_transport<R>(t: &mut R)
284where
285R: TReadTransport,
286 {
287 t.bytes();
288 }
289290fn takes_write_transport<W>(t: &mut W)
291where
292W: TWriteTransport,
293 {
294 t.flush().unwrap();
295 }
296}