tiberius/tds/stream/
query.rs

1use crate::tds::stream::ReceivedToken;
2use crate::{row::ColumnType, Column, Row};
3use futures_util::{
4    ready,
5    stream::{BoxStream, Peekable, Stream, StreamExt, TryStreamExt},
6};
7use std::{
8    fmt::Debug,
9    pin::Pin,
10    sync::Arc,
11    task::{self, Poll},
12};
13
14/// A set of `Streams` of [`QueryItem`] values, which can be either result
15/// metadata or a row.
16///
17/// The `QueryStream` needs to be polled empty before sending another query to
18/// the [`Client`], failing to do so causes a flush before the next query,
19/// slowing it down in an undeterministic way.
20///
21/// Every stream starts with metadata, describing the structure of the incoming
22/// rows, e.g. the columns in the order they are presented in every row.
23///
24/// If after consuming rows from the stream, another metadata result arrives, it
25/// means the stream has multiple results from different queries. This new
26/// metadata item will describe the next rows from here forwards.
27///
28/// If having one set of results in the response, using [`into_row_stream`]
29/// might be more convenient to use.
30///
31/// The struct provides non-streaming APIs with [`into_results`],
32/// [`into_first_result`] and [`into_row`].
33///
34/// # Example
35///
36/// ```
37/// # use tiberius::{Config, QueryItem};
38/// # use tokio_util::compat::TokioAsyncWriteCompatExt;
39/// # use std::env;
40/// # use futures_util::stream::TryStreamExt;
41/// # #[tokio::main]
42/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
43/// # let c_str = env::var("TIBERIUS_TEST_CONNECTION_STRING").unwrap_or(
44/// #     "server=tcp:localhost,1433;integratedSecurity=true;TrustServerCertificate=true".to_owned(),
45/// # );
46/// # let config = Config::from_ado_string(&c_str)?;
47/// # let tcp = tokio::net::TcpStream::connect(config.get_addr()).await?;
48/// # tcp.set_nodelay(true)?;
49/// # let mut client = tiberius::Client::connect(config, tcp.compat_write()).await?;
50/// let mut stream = client
51///     .query(
52///         "SELECT @P1 AS first; SELECT @P2 AS second",
53///         &[&1i32, &2i32],
54///     )
55///     .await?;
56///
57/// // The stream consists of four items, in the following order:
58/// // - Metadata from `SELECT 1`
59/// // - The only resulting row from `SELECT 1`
60/// // - Metadata from `SELECT 2`
61/// // - The only resulting row from `SELECT 2`
62/// while let Some(item) = stream.try_next().await? {
63///     match item {
64///         // our first item is the column data always
65///         QueryItem::Metadata(meta) if meta.result_index() == 0 => {
66///             // the first result column info can be handled here
67///         }
68///         // ... and from there on from 0..N rows
69///         QueryItem::Row(row) if row.result_index() == 0 => {
70///             assert_eq!(Some(1), row.get(0));
71///         }
72///         // the second result set returns first another metadata item
73///         QueryItem::Metadata(meta) => {
74///             // .. handling
75///         }
76///         // ...and, again, we get rows from the second resultset
77///         QueryItem::Row(row) => {
78///             assert_eq!(Some(2), row.get(0));
79///         }
80///     }
81/// }
82/// # Ok(())
83/// # }
84/// ```
85///
86/// [`Client`]: struct.Client.html
87/// [`into_row_stream`]: struct.QueryStream.html#method.into_row_stream
88/// [`into_results`]: struct.QueryStream.html#method.into_results
89/// [`into_first_result`]: struct.QueryStream.html#method.into_first_result
90/// [`into_row`]: struct.QueryStream.html#method.into_row
91pub struct QueryStream<'a> {
92    token_stream: Peekable<BoxStream<'a, crate::Result<ReceivedToken>>>,
93    columns: Option<Arc<Vec<Column>>>,
94    result_set_index: Option<usize>,
95}
96
97impl<'a> Debug for QueryStream<'a> {
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        f.debug_struct("QueryStream")
100            .field(
101                "token_stream",
102                &"BoxStream<'a, crate::Result<ReceivedToken>>",
103            )
104            .finish()
105    }
106}
107
108impl<'a> QueryStream<'a> {
109    pub(crate) fn new(token_stream: BoxStream<'a, crate::Result<ReceivedToken>>) -> Self {
110        Self {
111            token_stream: token_stream.peekable(),
112            columns: None,
113            result_set_index: None,
114        }
115    }
116
117    /// Moves the stream forward until having result metadata, stream end or an
118    /// error.
119    pub(crate) async fn forward_to_metadata(&mut self) -> crate::Result<()> {
120        loop {
121            let item = Pin::new(&mut self.token_stream)
122                .peek()
123                .await
124                .map(|r| r.as_ref().map_err(|e| e.clone()))
125                .transpose()?;
126
127            match item {
128                Some(ReceivedToken::NewResultset(_)) => break,
129                Some(_) => {
130                    self.token_stream.try_next().await?;
131                }
132                None => break,
133            }
134        }
135
136        Ok(())
137    }
138
139    /// The list of columns either for the current result set, or for the next
140    /// one. If the stream is just created, or if the next item in the stream
141    /// contains metadata, the metadata will be taken from the stream. Otherwise
142    /// the columns will be returned from the cache and reflect on the current
143    /// result set.
144    ///
145    /// # Example
146    ///
147    /// ```
148    /// # use tiberius::Config;
149    /// # use tokio_util::compat::TokioAsyncWriteCompatExt;
150    /// # use std::env;
151    /// # use futures_util::stream::TryStreamExt;
152    /// # #[tokio::main]
153    /// # async fn main() -> anyhow::Result<()> {
154    /// # let c_str = env::var("TIBERIUS_TEST_CONNECTION_STRING").unwrap_or(
155    /// #     "server=tcp:localhost,1433;integratedSecurity=true;TrustServerCertificate=true".to_owned(),
156    /// # );
157    /// # let config = Config::from_ado_string(&c_str)?;
158    /// # let tcp = tokio::net::TcpStream::connect(config.get_addr()).await?;
159    /// # tcp.set_nodelay(true)?;
160    /// # let mut client = tiberius::Client::connect(config, tcp.compat_write()).await?;
161    /// let mut stream = client
162    ///     .query(
163    ///         "SELECT @P1 AS first; SELECT @P2 AS second",
164    ///         &[&1i32, &2i32],
165    ///     )
166    ///     .await?;
167    ///
168    /// // Nothing is fetched, the first result set starts.
169    /// let cols = stream.columns().await?.unwrap();
170    /// assert_eq!("first", cols[0].name());
171    ///
172    /// // Move over the metadata.
173    /// stream.try_next().await?;
174    ///
175    /// // We're in the first row, seeing the metadata for that set.
176    /// let cols = stream.columns().await?.unwrap();
177    /// assert_eq!("first", cols[0].name());
178    ///
179    /// // Move over the only row in the first set.
180    /// stream.try_next().await?;
181    ///
182    /// // End of the first set, getting the metadata by peaking the next item.
183    /// let cols = stream.columns().await?.unwrap();
184    /// assert_eq!("second", cols[0].name());
185    /// # Ok(())
186    /// # }
187    /// ```
188    pub async fn columns(&mut self) -> crate::Result<Option<&[Column]>> {
189        use ReceivedToken::*;
190
191        loop {
192            let item = Pin::new(&mut self.token_stream)
193                .peek()
194                .await
195                .map(|r| r.as_ref().map_err(|e| e.clone()))
196                .transpose()?;
197
198            match item {
199                Some(token) => match token {
200                    NewResultset(metadata) => {
201                        self.columns = Some(Arc::new(metadata.columns().collect()));
202                        break;
203                    }
204                    Row(_) => {
205                        break;
206                    }
207                    _ => {
208                        self.token_stream.try_next().await?;
209                        continue;
210                    }
211                },
212                None => {
213                    break;
214                }
215            }
216        }
217
218        Ok(self.columns.as_ref().map(|c| c.as_slice()))
219    }
220
221    /// Collects results from all queries in the stream into memory in the order
222    /// of querying.
223    pub async fn into_results(mut self) -> crate::Result<Vec<Vec<Row>>> {
224        let mut results: Vec<Vec<Row>> = Vec::new();
225        let mut result: Option<Vec<Row>> = None;
226
227        while let Some(item) = self.try_next().await? {
228            match (item, &mut result) {
229                (QueryItem::Row(row), None) => {
230                    result = Some(vec![row]);
231                }
232                (QueryItem::Row(row), Some(ref mut result)) => result.push(row),
233                (QueryItem::Metadata(_), None) => {
234                    result = Some(Vec::new());
235                }
236                (QueryItem::Metadata(_), ref mut previous_result) => {
237                    results.push(previous_result.take().unwrap());
238                    result = None;
239                }
240            }
241        }
242
243        if let Some(result) = result {
244            results.push(result);
245        }
246
247        Ok(results)
248    }
249
250    /// Collects the output of the first query, dropping any further
251    /// results.
252    pub async fn into_first_result(self) -> crate::Result<Vec<Row>> {
253        let mut results = self.into_results().await?.into_iter();
254        let rows = results.next().unwrap_or_default();
255
256        Ok(rows)
257    }
258
259    /// Collects the first row from the output of the first query, dropping any
260    /// further rows.
261    pub async fn into_row(self) -> crate::Result<Option<Row>> {
262        let mut results = self.into_first_result().await?.into_iter();
263
264        Ok(results.next())
265    }
266
267    /// Convert the stream into a stream of rows, skipping metadata items.
268    pub fn into_row_stream(self) -> BoxStream<'a, crate::Result<Row>> {
269        let s = self.try_filter_map(|item| async {
270            match item {
271                QueryItem::Row(row) => Ok(Some(row)),
272                QueryItem::Metadata(_) => Ok(None),
273            }
274        });
275
276        Box::pin(s)
277    }
278}
279
280/// Info about the following stream of rows.
281#[derive(Debug, Clone)]
282pub struct ResultMetadata {
283    columns: Arc<Vec<Column>>,
284    result_index: usize,
285}
286
287impl ResultMetadata {
288    /// Column info. The order is the same as in the following rows.
289    pub fn columns(&self) -> &[Column] {
290        &self.columns
291    }
292
293    /// The number of the result set, an incrementing value starting from zero,
294    /// which gives an indication of the position of the result set in the
295    /// stream.
296    pub fn result_index(&self) -> usize {
297        self.result_index
298    }
299}
300
301/// Resulting data from a query.
302#[derive(Debug)]
303pub enum QueryItem {
304    /// A single row of data.
305    Row(Row),
306    /// Information of the upcoming row data.
307    Metadata(ResultMetadata),
308}
309
310impl QueryItem {
311    pub(crate) fn metadata(columns: Arc<Vec<Column>>, result_index: usize) -> Self {
312        Self::Metadata(ResultMetadata {
313            columns,
314            result_index,
315        })
316    }
317
318    /// Returns a reference to the metadata, if the item is of a correct variant.
319    pub fn as_metadata(&self) -> Option<&ResultMetadata> {
320        match self {
321            QueryItem::Row(_) => None,
322            QueryItem::Metadata(ref metadata) => Some(metadata),
323        }
324    }
325
326    /// Returns a reference to the row, if the item is of a correct variant.
327    pub fn as_row(&self) -> Option<&Row> {
328        match self {
329            QueryItem::Row(ref row) => Some(row),
330            QueryItem::Metadata(_) => None,
331        }
332    }
333
334    /// Returns the metadata, if the item is of a correct variant.
335    pub fn into_metadata(self) -> Option<ResultMetadata> {
336        match self {
337            QueryItem::Row(_) => None,
338            QueryItem::Metadata(metadata) => Some(metadata),
339        }
340    }
341
342    /// Returns the row, if the item is of a correct variant.
343    pub fn into_row(self) -> Option<Row> {
344        match self {
345            QueryItem::Row(row) => Some(row),
346            QueryItem::Metadata(_) => None,
347        }
348    }
349}
350
351impl<'a> Stream for QueryStream<'a> {
352    type Item = crate::Result<QueryItem>;
353
354    fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
355        let this = self.get_mut();
356
357        loop {
358            let token = match ready!(this.token_stream.poll_next_unpin(cx)) {
359                Some(res) => res?,
360                None => return Poll::Ready(None),
361            };
362
363            return match token {
364                ReceivedToken::NewResultset(meta) => {
365                    let column_meta = meta
366                        .columns
367                        .iter()
368                        .map(|x| Column {
369                            name: x.col_name.to_string(),
370                            column_type: ColumnType::from(&x.base.ty),
371                        })
372                        .collect::<Vec<_>>();
373
374                    let column_meta = Arc::new(column_meta);
375                    this.columns = Some(column_meta.clone());
376
377                    this.result_set_index = this.result_set_index.map(|i| i + 1);
378
379                    let query_item =
380                        QueryItem::metadata(column_meta, *this.result_set_index.get_or_insert(0));
381
382                    return Poll::Ready(Some(Ok(query_item)));
383                }
384                ReceivedToken::Row(data) => {
385                    let columns = this.columns.as_ref().unwrap().clone();
386                    let result_index = this.result_set_index.unwrap();
387
388                    let row = Row {
389                        columns,
390                        data,
391                        result_index,
392                    };
393
394                    Poll::Ready(Some(Ok(QueryItem::Row(row))))
395                }
396                _ => continue,
397            };
398        }
399    }
400}