tiberius/tds/stream/query.rs
1use crate::tds::stream::ReceivedToken;
2use crate::{row::ColumnType, Column, Row};
3use futures_util::{
4 ready,
5 stream::{BoxStream, Peekable, Stream, StreamExt, TryStreamExt},
6};
7use std::{
8 fmt::Debug,
9 pin::Pin,
10 sync::Arc,
11 task::{self, Poll},
12};
13
14/// A set of `Streams` of [`QueryItem`] values, which can be either result
15/// metadata or a row.
16///
17/// The `QueryStream` needs to be polled empty before sending another query to
18/// the [`Client`], failing to do so causes a flush before the next query,
19/// slowing it down in an undeterministic way.
20///
21/// Every stream starts with metadata, describing the structure of the incoming
22/// rows, e.g. the columns in the order they are presented in every row.
23///
24/// If after consuming rows from the stream, another metadata result arrives, it
25/// means the stream has multiple results from different queries. This new
26/// metadata item will describe the next rows from here forwards.
27///
28/// If having one set of results in the response, using [`into_row_stream`]
29/// might be more convenient to use.
30///
31/// The struct provides non-streaming APIs with [`into_results`],
32/// [`into_first_result`] and [`into_row`].
33///
34/// # Example
35///
36/// ```
37/// # use tiberius::{Config, QueryItem};
38/// # use tokio_util::compat::TokioAsyncWriteCompatExt;
39/// # use std::env;
40/// # use futures_util::stream::TryStreamExt;
41/// # #[tokio::main]
42/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
43/// # let c_str = env::var("TIBERIUS_TEST_CONNECTION_STRING").unwrap_or(
44/// # "server=tcp:localhost,1433;integratedSecurity=true;TrustServerCertificate=true".to_owned(),
45/// # );
46/// # let config = Config::from_ado_string(&c_str)?;
47/// # let tcp = tokio::net::TcpStream::connect(config.get_addr()).await?;
48/// # tcp.set_nodelay(true)?;
49/// # let mut client = tiberius::Client::connect(config, tcp.compat_write()).await?;
50/// let mut stream = client
51/// .query(
52/// "SELECT @P1 AS first; SELECT @P2 AS second",
53/// &[&1i32, &2i32],
54/// )
55/// .await?;
56///
57/// // The stream consists of four items, in the following order:
58/// // - Metadata from `SELECT 1`
59/// // - The only resulting row from `SELECT 1`
60/// // - Metadata from `SELECT 2`
61/// // - The only resulting row from `SELECT 2`
62/// while let Some(item) = stream.try_next().await? {
63/// match item {
64/// // our first item is the column data always
65/// QueryItem::Metadata(meta) if meta.result_index() == 0 => {
66/// // the first result column info can be handled here
67/// }
68/// // ... and from there on from 0..N rows
69/// QueryItem::Row(row) if row.result_index() == 0 => {
70/// assert_eq!(Some(1), row.get(0));
71/// }
72/// // the second result set returns first another metadata item
73/// QueryItem::Metadata(meta) => {
74/// // .. handling
75/// }
76/// // ...and, again, we get rows from the second resultset
77/// QueryItem::Row(row) => {
78/// assert_eq!(Some(2), row.get(0));
79/// }
80/// }
81/// }
82/// # Ok(())
83/// # }
84/// ```
85///
86/// [`Client`]: struct.Client.html
87/// [`into_row_stream`]: struct.QueryStream.html#method.into_row_stream
88/// [`into_results`]: struct.QueryStream.html#method.into_results
89/// [`into_first_result`]: struct.QueryStream.html#method.into_first_result
90/// [`into_row`]: struct.QueryStream.html#method.into_row
91pub struct QueryStream<'a> {
92 token_stream: Peekable<BoxStream<'a, crate::Result<ReceivedToken>>>,
93 columns: Option<Arc<Vec<Column>>>,
94 result_set_index: Option<usize>,
95}
96
97impl<'a> Debug for QueryStream<'a> {
98 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99 f.debug_struct("QueryStream")
100 .field(
101 "token_stream",
102 &"BoxStream<'a, crate::Result<ReceivedToken>>",
103 )
104 .finish()
105 }
106}
107
108impl<'a> QueryStream<'a> {
109 pub(crate) fn new(token_stream: BoxStream<'a, crate::Result<ReceivedToken>>) -> Self {
110 Self {
111 token_stream: token_stream.peekable(),
112 columns: None,
113 result_set_index: None,
114 }
115 }
116
117 /// Moves the stream forward until having result metadata, stream end or an
118 /// error.
119 pub(crate) async fn forward_to_metadata(&mut self) -> crate::Result<()> {
120 loop {
121 let item = Pin::new(&mut self.token_stream)
122 .peek()
123 .await
124 .map(|r| r.as_ref().map_err(|e| e.clone()))
125 .transpose()?;
126
127 match item {
128 Some(ReceivedToken::NewResultset(_)) => break,
129 Some(_) => {
130 self.token_stream.try_next().await?;
131 }
132 None => break,
133 }
134 }
135
136 Ok(())
137 }
138
139 /// The list of columns either for the current result set, or for the next
140 /// one. If the stream is just created, or if the next item in the stream
141 /// contains metadata, the metadata will be taken from the stream. Otherwise
142 /// the columns will be returned from the cache and reflect on the current
143 /// result set.
144 ///
145 /// # Example
146 ///
147 /// ```
148 /// # use tiberius::Config;
149 /// # use tokio_util::compat::TokioAsyncWriteCompatExt;
150 /// # use std::env;
151 /// # use futures_util::stream::TryStreamExt;
152 /// # #[tokio::main]
153 /// # async fn main() -> anyhow::Result<()> {
154 /// # let c_str = env::var("TIBERIUS_TEST_CONNECTION_STRING").unwrap_or(
155 /// # "server=tcp:localhost,1433;integratedSecurity=true;TrustServerCertificate=true".to_owned(),
156 /// # );
157 /// # let config = Config::from_ado_string(&c_str)?;
158 /// # let tcp = tokio::net::TcpStream::connect(config.get_addr()).await?;
159 /// # tcp.set_nodelay(true)?;
160 /// # let mut client = tiberius::Client::connect(config, tcp.compat_write()).await?;
161 /// let mut stream = client
162 /// .query(
163 /// "SELECT @P1 AS first; SELECT @P2 AS second",
164 /// &[&1i32, &2i32],
165 /// )
166 /// .await?;
167 ///
168 /// // Nothing is fetched, the first result set starts.
169 /// let cols = stream.columns().await?.unwrap();
170 /// assert_eq!("first", cols[0].name());
171 ///
172 /// // Move over the metadata.
173 /// stream.try_next().await?;
174 ///
175 /// // We're in the first row, seeing the metadata for that set.
176 /// let cols = stream.columns().await?.unwrap();
177 /// assert_eq!("first", cols[0].name());
178 ///
179 /// // Move over the only row in the first set.
180 /// stream.try_next().await?;
181 ///
182 /// // End of the first set, getting the metadata by peaking the next item.
183 /// let cols = stream.columns().await?.unwrap();
184 /// assert_eq!("second", cols[0].name());
185 /// # Ok(())
186 /// # }
187 /// ```
188 pub async fn columns(&mut self) -> crate::Result<Option<&[Column]>> {
189 use ReceivedToken::*;
190
191 loop {
192 let item = Pin::new(&mut self.token_stream)
193 .peek()
194 .await
195 .map(|r| r.as_ref().map_err(|e| e.clone()))
196 .transpose()?;
197
198 match item {
199 Some(token) => match token {
200 NewResultset(metadata) => {
201 self.columns = Some(Arc::new(metadata.columns().collect()));
202 break;
203 }
204 Row(_) => {
205 break;
206 }
207 _ => {
208 self.token_stream.try_next().await?;
209 continue;
210 }
211 },
212 None => {
213 break;
214 }
215 }
216 }
217
218 Ok(self.columns.as_ref().map(|c| c.as_slice()))
219 }
220
221 /// Collects results from all queries in the stream into memory in the order
222 /// of querying.
223 pub async fn into_results(mut self) -> crate::Result<Vec<Vec<Row>>> {
224 let mut results: Vec<Vec<Row>> = Vec::new();
225 let mut result: Option<Vec<Row>> = None;
226
227 while let Some(item) = self.try_next().await? {
228 match (item, &mut result) {
229 (QueryItem::Row(row), None) => {
230 result = Some(vec![row]);
231 }
232 (QueryItem::Row(row), Some(ref mut result)) => result.push(row),
233 (QueryItem::Metadata(_), None) => {
234 result = Some(Vec::new());
235 }
236 (QueryItem::Metadata(_), ref mut previous_result) => {
237 results.push(previous_result.take().unwrap());
238 result = None;
239 }
240 }
241 }
242
243 if let Some(result) = result {
244 results.push(result);
245 }
246
247 Ok(results)
248 }
249
250 /// Collects the output of the first query, dropping any further
251 /// results.
252 pub async fn into_first_result(self) -> crate::Result<Vec<Row>> {
253 let mut results = self.into_results().await?.into_iter();
254 let rows = results.next().unwrap_or_default();
255
256 Ok(rows)
257 }
258
259 /// Collects the first row from the output of the first query, dropping any
260 /// further rows.
261 pub async fn into_row(self) -> crate::Result<Option<Row>> {
262 let mut results = self.into_first_result().await?.into_iter();
263
264 Ok(results.next())
265 }
266
267 /// Convert the stream into a stream of rows, skipping metadata items.
268 pub fn into_row_stream(self) -> BoxStream<'a, crate::Result<Row>> {
269 let s = self.try_filter_map(|item| async {
270 match item {
271 QueryItem::Row(row) => Ok(Some(row)),
272 QueryItem::Metadata(_) => Ok(None),
273 }
274 });
275
276 Box::pin(s)
277 }
278}
279
280/// Info about the following stream of rows.
281#[derive(Debug, Clone)]
282pub struct ResultMetadata {
283 columns: Arc<Vec<Column>>,
284 result_index: usize,
285}
286
287impl ResultMetadata {
288 /// Column info. The order is the same as in the following rows.
289 pub fn columns(&self) -> &[Column] {
290 &self.columns
291 }
292
293 /// The number of the result set, an incrementing value starting from zero,
294 /// which gives an indication of the position of the result set in the
295 /// stream.
296 pub fn result_index(&self) -> usize {
297 self.result_index
298 }
299}
300
301/// Resulting data from a query.
302#[derive(Debug)]
303pub enum QueryItem {
304 /// A single row of data.
305 Row(Row),
306 /// Information of the upcoming row data.
307 Metadata(ResultMetadata),
308}
309
310impl QueryItem {
311 pub(crate) fn metadata(columns: Arc<Vec<Column>>, result_index: usize) -> Self {
312 Self::Metadata(ResultMetadata {
313 columns,
314 result_index,
315 })
316 }
317
318 /// Returns a reference to the metadata, if the item is of a correct variant.
319 pub fn as_metadata(&self) -> Option<&ResultMetadata> {
320 match self {
321 QueryItem::Row(_) => None,
322 QueryItem::Metadata(ref metadata) => Some(metadata),
323 }
324 }
325
326 /// Returns a reference to the row, if the item is of a correct variant.
327 pub fn as_row(&self) -> Option<&Row> {
328 match self {
329 QueryItem::Row(ref row) => Some(row),
330 QueryItem::Metadata(_) => None,
331 }
332 }
333
334 /// Returns the metadata, if the item is of a correct variant.
335 pub fn into_metadata(self) -> Option<ResultMetadata> {
336 match self {
337 QueryItem::Row(_) => None,
338 QueryItem::Metadata(metadata) => Some(metadata),
339 }
340 }
341
342 /// Returns the row, if the item is of a correct variant.
343 pub fn into_row(self) -> Option<Row> {
344 match self {
345 QueryItem::Row(row) => Some(row),
346 QueryItem::Metadata(_) => None,
347 }
348 }
349}
350
351impl<'a> Stream for QueryStream<'a> {
352 type Item = crate::Result<QueryItem>;
353
354 fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
355 let this = self.get_mut();
356
357 loop {
358 let token = match ready!(this.token_stream.poll_next_unpin(cx)) {
359 Some(res) => res?,
360 None => return Poll::Ready(None),
361 };
362
363 return match token {
364 ReceivedToken::NewResultset(meta) => {
365 let column_meta = meta
366 .columns
367 .iter()
368 .map(|x| Column {
369 name: x.col_name.to_string(),
370 column_type: ColumnType::from(&x.base.ty),
371 })
372 .collect::<Vec<_>>();
373
374 let column_meta = Arc::new(column_meta);
375 this.columns = Some(column_meta.clone());
376
377 this.result_set_index = this.result_set_index.map(|i| i + 1);
378
379 let query_item =
380 QueryItem::metadata(column_meta, *this.result_set_index.get_or_insert(0));
381
382 return Poll::Ready(Some(Ok(query_item)));
383 }
384 ReceivedToken::Row(data) => {
385 let columns = this.columns.as_ref().unwrap().clone();
386 let result_index = this.result_set_index.unwrap();
387
388 let row = Row {
389 columns,
390 data,
391 result_index,
392 };
393
394 Poll::Ready(Some(Ok(QueryItem::Row(row))))
395 }
396 _ => continue,
397 };
398 }
399 }
400}