thrift/protocol/
multiplexed.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use super::{
19    TFieldIdentifier, TListIdentifier, TMapIdentifier, TMessageIdentifier, TMessageType,
20    TOutputProtocol, TSetIdentifier, TStructIdentifier,
21};
22
23/// `TOutputProtocol` that prefixes the service name to all outgoing Thrift
24/// messages.
25///
26/// A `TMultiplexedOutputProtocol` should be used when multiple Thrift services
27/// send messages over a single I/O channel. By prefixing service identifiers
28/// to outgoing messages receivers are able to demux them and route them to the
29/// appropriate service processor. Rust receivers must use a `TMultiplexedProcessor`
30/// to process incoming messages, while other languages must use their
31/// corresponding multiplexed processor implementations.
32///
33/// For example, given a service `TestService` and a service call `test_call`,
34/// this implementation would identify messages as originating from
35/// `TestService:test_call`.
36///
37/// # Examples
38///
39/// Create and use a `TMultiplexedOutputProtocol`.
40///
41/// ```no_run
42/// use thrift::protocol::{TMessageIdentifier, TMessageType, TOutputProtocol};
43/// use thrift::protocol::{TBinaryOutputProtocol, TMultiplexedOutputProtocol};
44/// use thrift::transport::TTcpChannel;
45///
46/// let mut channel = TTcpChannel::new();
47/// channel.open("localhost:9090").unwrap();
48///
49/// let protocol = TBinaryOutputProtocol::new(channel, true);
50/// let mut protocol = TMultiplexedOutputProtocol::new("service_name", protocol);
51///
52/// let ident = TMessageIdentifier::new("svc_call", TMessageType::Call, 1);
53/// protocol.write_message_begin(&ident).unwrap();
54/// ```
55#[derive(Debug)]
56pub struct TMultiplexedOutputProtocol<P>
57where
58    P: TOutputProtocol,
59{
60    service_name: String,
61    inner: P,
62}
63
64impl<P> TMultiplexedOutputProtocol<P>
65where
66    P: TOutputProtocol,
67{
68    /// Create a `TMultiplexedOutputProtocol` that identifies outgoing messages
69    /// as originating from a service named `service_name` and sends them over
70    /// the `wrapped` `TOutputProtocol`. Outgoing messages are encoded and sent
71    /// by `wrapped`, not by this instance.
72    pub fn new(service_name: &str, wrapped: P) -> TMultiplexedOutputProtocol<P> {
73        TMultiplexedOutputProtocol {
74            service_name: service_name.to_owned(),
75            inner: wrapped,
76        }
77    }
78}
79
80// FIXME: avoid passthrough methods
81impl<P> TOutputProtocol for TMultiplexedOutputProtocol<P>
82where
83    P: TOutputProtocol,
84{
85    fn write_message_begin(&mut self, identifier: &TMessageIdentifier) -> crate::Result<()> {
86        match identifier.message_type {
87            // FIXME: is there a better way to override identifier here?
88            TMessageType::Call | TMessageType::OneWay => {
89                let identifier = TMessageIdentifier {
90                    name: format!("{}:{}", self.service_name, identifier.name),
91                    ..*identifier
92                };
93                self.inner.write_message_begin(&identifier)
94            }
95            _ => self.inner.write_message_begin(identifier),
96        }
97    }
98
99    fn write_message_end(&mut self) -> crate::Result<()> {
100        self.inner.write_message_end()
101    }
102
103    fn write_struct_begin(&mut self, identifier: &TStructIdentifier) -> crate::Result<()> {
104        self.inner.write_struct_begin(identifier)
105    }
106
107    fn write_struct_end(&mut self) -> crate::Result<()> {
108        self.inner.write_struct_end()
109    }
110
111    fn write_field_begin(&mut self, identifier: &TFieldIdentifier) -> crate::Result<()> {
112        self.inner.write_field_begin(identifier)
113    }
114
115    fn write_field_end(&mut self) -> crate::Result<()> {
116        self.inner.write_field_end()
117    }
118
119    fn write_field_stop(&mut self) -> crate::Result<()> {
120        self.inner.write_field_stop()
121    }
122
123    fn write_bytes(&mut self, b: &[u8]) -> crate::Result<()> {
124        self.inner.write_bytes(b)
125    }
126
127    fn write_bool(&mut self, b: bool) -> crate::Result<()> {
128        self.inner.write_bool(b)
129    }
130
131    fn write_i8(&mut self, i: i8) -> crate::Result<()> {
132        self.inner.write_i8(i)
133    }
134
135    fn write_i16(&mut self, i: i16) -> crate::Result<()> {
136        self.inner.write_i16(i)
137    }
138
139    fn write_i32(&mut self, i: i32) -> crate::Result<()> {
140        self.inner.write_i32(i)
141    }
142
143    fn write_i64(&mut self, i: i64) -> crate::Result<()> {
144        self.inner.write_i64(i)
145    }
146
147    fn write_double(&mut self, d: f64) -> crate::Result<()> {
148        self.inner.write_double(d)
149    }
150
151    fn write_string(&mut self, s: &str) -> crate::Result<()> {
152        self.inner.write_string(s)
153    }
154
155    fn write_list_begin(&mut self, identifier: &TListIdentifier) -> crate::Result<()> {
156        self.inner.write_list_begin(identifier)
157    }
158
159    fn write_list_end(&mut self) -> crate::Result<()> {
160        self.inner.write_list_end()
161    }
162
163    fn write_set_begin(&mut self, identifier: &TSetIdentifier) -> crate::Result<()> {
164        self.inner.write_set_begin(identifier)
165    }
166
167    fn write_set_end(&mut self) -> crate::Result<()> {
168        self.inner.write_set_end()
169    }
170
171    fn write_map_begin(&mut self, identifier: &TMapIdentifier) -> crate::Result<()> {
172        self.inner.write_map_begin(identifier)
173    }
174
175    fn write_map_end(&mut self) -> crate::Result<()> {
176        self.inner.write_map_end()
177    }
178
179    fn flush(&mut self) -> crate::Result<()> {
180        self.inner.flush()
181    }
182
183    // utility
184    //
185
186    fn write_byte(&mut self, b: u8) -> crate::Result<()> {
187        self.inner.write_byte(b)
188    }
189}
190
191#[cfg(test)]
192mod tests {
193
194    use crate::protocol::{
195        TBinaryOutputProtocol, TMessageIdentifier, TMessageType, TOutputProtocol,
196    };
197    use crate::transport::{TBufferChannel, TIoChannel, WriteHalf};
198
199    use super::*;
200
201    #[test]
202    fn must_write_message_begin_with_prefixed_service_name() {
203        let mut o_prot = test_objects();
204
205        let ident = TMessageIdentifier::new("bar", TMessageType::Call, 2);
206        assert_success!(o_prot.write_message_begin(&ident));
207
208        #[rustfmt::skip]
209        let expected: [u8; 19] = [
210            0x80,
211            0x01, /* protocol identifier */
212            0x00,
213            0x01, /* message type */
214            0x00,
215            0x00,
216            0x00,
217            0x07,
218            0x66,
219            0x6F,
220            0x6F, /* "foo" */
221            0x3A, /* ":" */
222            0x62,
223            0x61,
224            0x72, /* "bar" */
225            0x00,
226            0x00,
227            0x00,
228            0x02 /* sequence number */,
229        ];
230
231        assert_eq!(o_prot.inner.transport.write_bytes(), expected);
232    }
233
234    fn test_objects() -> TMultiplexedOutputProtocol<TBinaryOutputProtocol<WriteHalf<TBufferChannel>>>
235    {
236        let c = TBufferChannel::with_capacity(40, 40);
237        let (_, w_chan) = c.split().unwrap();
238        let prot = TBinaryOutputProtocol::new(w_chan, true);
239        TMultiplexedOutputProtocol::new("foo", prot)
240    }
241}