mysql_async/queryable/query_result/
mod.rs

1// Copyright (c) 2017 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use mysql_common::row::convert::FromRowError;
10
11use std::{borrow::Cow, fmt, marker::PhantomData, result::Result as StdResult, sync::Arc};
12
13use crate::{
14    conn::{routines::NextSetRoutine, PendingResult},
15    connection_like::Connection,
16    error::*,
17    prelude::{FromRow, Protocol},
18    Column, Row,
19};
20
21pub mod result_set_stream;
22mod tests;
23
24/// Result set metadata.
25#[derive(Debug, Clone, Eq, PartialEq)]
26pub enum ResultSetMeta {
27    /// Text result set, that may contain rows.
28    Text(Arc<[Column]>),
29    /// Binary result set, that may contain rows.
30    Binary(Arc<[Column]>),
31}
32
33impl ResultSetMeta {
34    fn columns(&self) -> &Arc<[Column]> {
35        match self {
36            ResultSetMeta::Text(cols) | ResultSetMeta::Binary(cols) => cols,
37        }
38    }
39}
40
41/// Result of a query or statement execution.
42///
43/// Represents an asynchronous query result, that may not be fully consumed.
44///
45/// # Note
46///
47/// Unconsumed query results are dropped implicitly when corresponding connection
48/// is dropped or queried. Also note, that in this case all remaining errors will be
49/// emitted to the caller:
50///
51/// ```rust
52/// # use mysql_async::test_misc::get_opts;
53/// # #[tokio::main]
54/// # async fn main() -> mysql_async::Result<()> {
55/// use mysql_async::*;
56/// use mysql_async::prelude::*;
57/// let mut conn = Conn::new(get_opts()).await?;
58///
59/// // second result set will contain an error,
60/// // but the first result set is ok, so this line will pass
61/// conn.query_iter("DO 1; BLABLA;").await?;
62/// // `QueryResult` was dropped withot being consumed
63///
64/// // driver must cleanup any unconsumed result to perform another query on `conn`,
65/// // so this operation will be performed implicitly, but the unconsumed result
66/// // contains an error claiming about 'BLABLA', so this error will be emitted here:
67/// assert!(conn.query_iter("DO 1").await.unwrap_err().to_string().contains("BLABLA"));
68///
69/// # conn.disconnect().await }
70/// ```
71pub struct QueryResult<'a, 't: 'a, P> {
72    conn: Connection<'a, 't>,
73    __phantom: PhantomData<P>,
74}
75
76impl<'a, 't: 'a, P> fmt::Debug for QueryResult<'a, 't, P> {
77    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78        f.debug_struct("QueryResult")
79            .field("conn", &self.conn)
80            .field("__phantom", &"PhantomData<P>")
81            .finish()
82    }
83}
84
85impl<'a, 't: 'a, P> QueryResult<'a, 't, P>
86where
87    P: Protocol,
88{
89    pub fn new<T: Into<Connection<'a, 't>>>(conn: T) -> Self {
90        QueryResult {
91            conn: conn.into(),
92            __phantom: PhantomData,
93        }
94    }
95
96    /// Returns `true` if this query result may contain rows.
97    ///
98    /// If `false` then no rows possible for this query tesult (e.g. result of an UPDATE query).
99    fn has_rows(&self) -> bool {
100        self.conn
101            .get_pending_result()
102            .map(|pending_result| match pending_result {
103                Some(PendingResult::Pending(meta)) => meta.columns().len() > 0,
104                Some(PendingResult::Taken(meta)) => meta.columns().len() > 0,
105                None => false,
106            })
107            .unwrap_or(false)
108    }
109
110    /// `true` if there are no more rows nor result sets in this query.
111    ///
112    /// This function will return `false` if the last result set was taken
113    /// by the [`QueryResult::stream`] that was dropped before being fully consumed
114    /// (i.e. caller will get `false` even if QueryResult data is reachable only for library internals).
115    pub fn is_empty(&self) -> bool {
116        !self.has_rows() && !self.conn.more_results_exists()
117    }
118
119    /// Low-level function that reads a result set row.
120    ///
121    /// Returns `None` if there are no more rows in the current set.
122    async fn next_row(&mut self, columns: Arc<[Column]>) -> crate::Result<Option<Row>> {
123        let mut row = None;
124
125        if columns.is_empty() {
126            // Empty, but not yet consumed result set.
127            self.conn.as_mut().set_pending_result(None)?;
128        } else {
129            // Not yet consumed non-empty result set.
130            let packet = match self.conn.as_mut().read_packet().await {
131                Ok(packet) => packet,
132                Err(err) => {
133                    // Next row contained an error. No more data will follow.
134                    self.conn.as_mut().set_pending_result(None)?;
135                    return Err(err);
136                }
137            };
138
139            if P::is_last_result_set_packet(self.conn.capabilities(), &packet) {
140                // `packet` is a result set terminator.
141                self.conn.as_mut().set_pending_result(None)?;
142            } else {
143                // `packet` is a result set row.
144                row = Some(P::read_result_set_row(&packet, columns)?);
145            }
146        }
147
148        Ok(row)
149    }
150
151    /// Low-level function that jumps to the next result set.
152    ///
153    /// Returns `false` if there are no more result sets.
154    async fn next_set(&mut self) -> crate::Result<bool> {
155        if self.conn.more_results_exists() {
156            // More data will follow.
157            self.conn
158                .as_mut()
159                .routine(NextSetRoutine::<P>::new())
160                .await?;
161        }
162        Ok(self.conn.has_pending_result())
163    }
164
165    /// Low-level function that reads a next row and tries to jump
166    /// to the next result set if the current one is exhausted.
167    async fn next_row_or_next_set(&mut self, meta: ResultSetMeta) -> crate::Result<Option<Row>> {
168        let columns = meta.columns().clone();
169
170        self.next_row_or_next_set2(columns).await
171    }
172
173    /// Low-level function that reads a next row and tries to jump
174    /// to the next result set if the current one is exhausted.
175    async fn next_row_or_next_set2(
176        &mut self,
177        columns: Arc<[Column]>,
178    ) -> crate::Result<Option<Row>> {
179        if let Some(row) = self.next_row(columns).await? {
180            Ok(Some(row))
181        } else {
182            self.next_set().await?;
183            Ok(None)
184        }
185    }
186
187    /// Skips the taken result set.
188    async fn skip_taken(&mut self, meta: Arc<ResultSetMeta>) -> crate::Result<()> {
189        while (self.next_row_or_next_set((*meta).clone()).await?).is_some() {}
190        Ok(())
191    }
192
193    #[doc(hidden)]
194    pub async fn next(&mut self) -> Result<Option<Row>> {
195        loop {
196            match self.conn.as_mut().use_pending_result()?.cloned() {
197                Some(PendingResult::Pending(meta)) => return self.next_row_or_next_set(meta).await,
198                Some(PendingResult::Taken(meta)) => self.skip_taken(meta).await?,
199                None => return Ok(None),
200            }
201        }
202    }
203
204    /// Last insert id, if any.
205    pub fn last_insert_id(&self) -> Option<u64> {
206        self.conn.last_insert_id()
207    }
208
209    /// Number of affected rows as reported by the server, or `0`.
210    pub fn affected_rows(&self) -> u64 {
211        self.conn.affected_rows()
212    }
213
214    /// Text information as reported by the server, or an empty string.
215    pub fn info(&self) -> Cow<'_, str> {
216        self.conn.info()
217    }
218
219    /// Number of warnings as reported by the server, or `0`.
220    pub fn warnings(&self) -> u16 {
221        self.conn.get_warnings()
222    }
223
224    /// Collects the current result set of this query result.
225    ///
226    /// It is parametrized by `R` and internally calls `R::from_row(Row)` on each row.
227    ///
228    /// It will collect rows up to a neares result set boundary. This means that you should call
229    /// `collect` as many times as result sets in your query result. For example query
230    /// `SELECT 'foo'; SELECT 'foo', 'bar';` will produce `QueryResult` with two result sets in it.
231    /// One can use `QueryResult::is_empty` to make sure that there is no more result sets.
232    ///
233    /// # Panic
234    ///
235    /// It'll panic if any row isn't convertible to `R` (i.e. programmer error or unknown schema).
236    /// * In case of programmer error see [`FromRow`] docs;
237    /// * In case of unknown schema use [`QueryResult::try_collect`].
238    pub async fn collect<R>(&mut self) -> Result<Vec<R>>
239    where
240        R: FromRow + Send + 'static,
241    {
242        self.reduce(Vec::new(), |mut acc, row| {
243            acc.push(FromRow::from_row(row));
244            acc
245        })
246        .await
247    }
248
249    /// Collects the current result set of this query result.
250    ///
251    /// It works the same way as [`QueryResult::collect`] but won't panic if row isn't convertible
252    /// to `R`.
253    pub async fn try_collect<R>(&mut self) -> Result<Vec<StdResult<R, FromRowError>>>
254    where
255        R: FromRow + Send + 'static,
256    {
257        self.reduce(Vec::new(), |mut acc, row| {
258            acc.push(FromRow::from_row_opt(row));
259            acc
260        })
261        .await
262    }
263
264    /// Collects the current result set of this query result and drops everything else.
265    ///
266    /// # Panic
267    ///
268    /// It'll panic if any row isn't convertible to `R` (i.e. programmer error or unknown schema).
269    /// * In case of programmer error see `FromRow` docs;
270    /// * In case of unknown schema use [`QueryResult::try_collect`].
271    pub async fn collect_and_drop<R>(mut self) -> Result<Vec<R>>
272    where
273        R: FromRow + Send + 'static,
274    {
275        let output = self.collect::<R>().await?;
276        self.drop_result().await?;
277        Ok(output)
278    }
279
280    /// Collects the current result set of this query result and drops everything else.
281    ///
282    /// It works the same way as [`QueryResult::collect_and_drop`] but won't panic if row isn't
283    /// convertible to `R`.
284    pub async fn try_collect_and_drop<R>(mut self) -> Result<Vec<StdResult<R, FromRowError>>>
285    where
286        R: FromRow + Send + 'static,
287    {
288        let output = self.try_collect().await?;
289        self.drop_result().await?;
290        Ok(output)
291    }
292
293    /// Executes `fun` on every row of the current result set.
294    ///
295    /// It will stop on the nearest result set boundary (see `QueryResult::collect` docs).
296    pub async fn for_each<F>(&mut self, mut fun: F) -> Result<()>
297    where
298        F: FnMut(Row),
299    {
300        if self.is_empty() {
301            Ok(())
302        } else {
303            while let Some(row) = self.next().await? {
304                fun(row);
305            }
306            Ok(())
307        }
308    }
309
310    /// Executes `fun` on every row of the current result set and drops everything else.
311    pub async fn for_each_and_drop<F>(mut self, fun: F) -> Result<()>
312    where
313        F: FnMut(Row),
314    {
315        self.for_each(fun).await?;
316        self.drop_result().await?;
317        Ok(())
318    }
319
320    /// Maps every row of the current result set to `U` using `fun`.
321    ///
322    /// It will stop on the nearest result set boundary (see `QueryResult::collect` docs).
323    pub async fn map<F, U>(&mut self, mut fun: F) -> Result<Vec<U>>
324    where
325        F: FnMut(Row) -> U,
326    {
327        let mut acc = Vec::new();
328        while let Some(row) = self.next().await? {
329            acc.push(fun(crate::from_row(row)));
330        }
331        Ok(acc)
332    }
333
334    /// Map every row of the current result set to `U` using `fun` and drops everything else.
335    pub async fn map_and_drop<F, U>(mut self, fun: F) -> Result<Vec<U>>
336    where
337        F: FnMut(Row) -> U,
338    {
339        let rows = self.map(fun).await?;
340        self.drop_result().await?;
341        Ok(rows)
342    }
343
344    /// Reduces rows of the current result set to `U` using `fun`.
345    ///
346    /// It will stop on the nearest result set boundary (see `QueryResult::collect` docs).
347    pub async fn reduce<T, F, U>(&mut self, mut init: U, mut fun: F) -> Result<U>
348    where
349        F: FnMut(U, T) -> U,
350        T: FromRow + Send + 'static,
351    {
352        while let Some(row) = self.next().await? {
353            init = fun(init, crate::from_row(row));
354        }
355        Ok(init)
356    }
357
358    /// Reduces rows of the current result set to `U` using `fun` and drops everything else.
359    pub async fn reduce_and_drop<T, F, U>(mut self, init: U, fun: F) -> Result<U>
360    where
361        F: FnMut(U, T) -> U,
362        T: FromRow + Send + 'static,
363    {
364        let acc = self.reduce(init, fun).await?;
365        self.drop_result().await?;
366        Ok(acc)
367    }
368
369    /// Drops this query result.
370    pub async fn drop_result(mut self) -> Result<()> {
371        loop {
372            while self.next().await?.is_some() {}
373            if !self.conn.has_pending_result() {
374                break Ok(());
375            }
376        }
377    }
378
379    /// Returns a reference to a columns list of this query result.
380    ///
381    /// Empty list means that this result set was never meant to contain rows.
382    pub fn columns_ref(&self) -> &[Column] {
383        self.conn
384            .get_pending_result()
385            .ok()
386            .flatten()
387            .map(|meta| match meta {
388                PendingResult::Pending(meta) => &meta.columns()[..],
389                PendingResult::Taken(meta) => &meta.columns()[..],
390            })
391            .unwrap_or_default()
392    }
393
394    /// Returns a copy of a columns list of this query result.
395    pub fn columns(&self) -> Option<Arc<[Column]>> {
396        self.conn
397            .get_pending_result()
398            .ok()
399            .flatten()
400            .map(|meta| match meta {
401                PendingResult::Pending(meta) => meta.columns(),
402                PendingResult::Taken(meta) => meta.columns(),
403            })
404            .cloned()
405    }
406}