mysql_async/queryable/query_result/mod.rs
1// Copyright (c) 2017 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use mysql_common::row::convert::FromRowError;
10
11use std::{borrow::Cow, fmt, marker::PhantomData, result::Result as StdResult, sync::Arc};
12
13use crate::{
14 conn::{routines::NextSetRoutine, PendingResult},
15 connection_like::Connection,
16 error::*,
17 prelude::{FromRow, Protocol},
18 Column, Row,
19};
20
21pub mod result_set_stream;
22mod tests;
23
24/// Result set metadata.
25#[derive(Debug, Clone, Eq, PartialEq)]
26pub enum ResultSetMeta {
27 /// Text result set, that may contain rows.
28 Text(Arc<[Column]>),
29 /// Binary result set, that may contain rows.
30 Binary(Arc<[Column]>),
31}
32
33impl ResultSetMeta {
34 fn columns(&self) -> &Arc<[Column]> {
35 match self {
36 ResultSetMeta::Text(cols) | ResultSetMeta::Binary(cols) => cols,
37 }
38 }
39}
40
41/// Result of a query or statement execution.
42///
43/// Represents an asynchronous query result, that may not be fully consumed.
44///
45/// # Note
46///
47/// Unconsumed query results are dropped implicitly when corresponding connection
48/// is dropped or queried. Also note, that in this case all remaining errors will be
49/// emitted to the caller:
50///
51/// ```rust
52/// # use mysql_async::test_misc::get_opts;
53/// # #[tokio::main]
54/// # async fn main() -> mysql_async::Result<()> {
55/// use mysql_async::*;
56/// use mysql_async::prelude::*;
57/// let mut conn = Conn::new(get_opts()).await?;
58///
59/// // second result set will contain an error,
60/// // but the first result set is ok, so this line will pass
61/// conn.query_iter("DO 1; BLABLA;").await?;
62/// // `QueryResult` was dropped withot being consumed
63///
64/// // driver must cleanup any unconsumed result to perform another query on `conn`,
65/// // so this operation will be performed implicitly, but the unconsumed result
66/// // contains an error claiming about 'BLABLA', so this error will be emitted here:
67/// assert!(conn.query_iter("DO 1").await.unwrap_err().to_string().contains("BLABLA"));
68///
69/// # conn.disconnect().await }
70/// ```
71pub struct QueryResult<'a, 't: 'a, P> {
72 conn: Connection<'a, 't>,
73 __phantom: PhantomData<P>,
74}
75
76impl<'a, 't: 'a, P> fmt::Debug for QueryResult<'a, 't, P> {
77 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78 f.debug_struct("QueryResult")
79 .field("conn", &self.conn)
80 .field("__phantom", &"PhantomData<P>")
81 .finish()
82 }
83}
84
85impl<'a, 't: 'a, P> QueryResult<'a, 't, P>
86where
87 P: Protocol,
88{
89 pub fn new<T: Into<Connection<'a, 't>>>(conn: T) -> Self {
90 QueryResult {
91 conn: conn.into(),
92 __phantom: PhantomData,
93 }
94 }
95
96 /// Returns `true` if this query result may contain rows.
97 ///
98 /// If `false` then no rows possible for this query tesult (e.g. result of an UPDATE query).
99 fn has_rows(&self) -> bool {
100 self.conn
101 .get_pending_result()
102 .map(|pending_result| match pending_result {
103 Some(PendingResult::Pending(meta)) => meta.columns().len() > 0,
104 Some(PendingResult::Taken(meta)) => meta.columns().len() > 0,
105 None => false,
106 })
107 .unwrap_or(false)
108 }
109
110 /// `true` if there are no more rows nor result sets in this query.
111 ///
112 /// This function will return `false` if the last result set was taken
113 /// by the [`QueryResult::stream`] that was dropped before being fully consumed
114 /// (i.e. caller will get `false` even if QueryResult data is reachable only for library internals).
115 pub fn is_empty(&self) -> bool {
116 !self.has_rows() && !self.conn.more_results_exists()
117 }
118
119 /// Low-level function that reads a result set row.
120 ///
121 /// Returns `None` if there are no more rows in the current set.
122 async fn next_row(&mut self, columns: Arc<[Column]>) -> crate::Result<Option<Row>> {
123 let mut row = None;
124
125 if columns.is_empty() {
126 // Empty, but not yet consumed result set.
127 self.conn.set_pending_result(None)?;
128 } else {
129 // Not yet consumed non-empty result set.
130 let packet = match self.conn.read_packet().await {
131 Ok(packet) => packet,
132 Err(err) => {
133 // Next row contained an error. No more data will follow.
134 self.conn.set_pending_result(None)?;
135 return Err(err);
136 }
137 };
138
139 if P::is_last_result_set_packet(self.conn.capabilities(), &packet) {
140 // `packet` is a result set terminator.
141 self.conn.set_pending_result(None)?;
142 } else {
143 // `packet` is a result set row.
144 row = Some(P::read_result_set_row(&packet, columns)?);
145 }
146 }
147
148 Ok(row)
149 }
150
151 /// Low-level function that jumps to the next result set.
152 ///
153 /// Returns `false` if there are no more result sets.
154 async fn next_set(&mut self) -> crate::Result<bool> {
155 if self.conn.more_results_exists() {
156 // More data will follow.
157 self.conn.routine(NextSetRoutine::<P>::new()).await?;
158 }
159 Ok(self.conn.has_pending_result())
160 }
161
162 /// Low-level function that reads a next row and tries to jump
163 /// to the next result set if the current one is exhausted.
164 async fn next_row_or_next_set(&mut self, meta: ResultSetMeta) -> crate::Result<Option<Row>> {
165 let columns = meta.columns().clone();
166
167 self.next_row_or_next_set2(columns).await
168 }
169
170 /// Low-level function that reads a next row and tries to jump
171 /// to the next result set if the current one is exhausted.
172 async fn next_row_or_next_set2(
173 &mut self,
174 columns: Arc<[Column]>,
175 ) -> crate::Result<Option<Row>> {
176 if let Some(row) = self.next_row(columns).await? {
177 Ok(Some(row))
178 } else {
179 self.next_set().await?;
180 Ok(None)
181 }
182 }
183
184 /// Skips the taken result set.
185 async fn skip_taken(&mut self, meta: Arc<ResultSetMeta>) -> crate::Result<()> {
186 while (self.next_row_or_next_set((*meta).clone()).await?).is_some() {}
187 Ok(())
188 }
189
190 #[doc(hidden)]
191 pub async fn next(&mut self) -> Result<Option<Row>> {
192 loop {
193 match self.conn.use_pending_result()?.cloned() {
194 Some(PendingResult::Pending(meta)) => return self.next_row_or_next_set(meta).await,
195 Some(PendingResult::Taken(meta)) => self.skip_taken(meta).await?,
196 None => return Ok(None),
197 }
198 }
199 }
200
201 /// Last insert id, if any.
202 pub fn last_insert_id(&self) -> Option<u64> {
203 self.conn.last_insert_id()
204 }
205
206 /// Number of affected rows as reported by the server, or `0`.
207 pub fn affected_rows(&self) -> u64 {
208 self.conn.affected_rows()
209 }
210
211 /// Text information as reported by the server, or an empty string.
212 pub fn info(&self) -> Cow<'_, str> {
213 self.conn.info()
214 }
215
216 /// Number of warnings as reported by the server, or `0`.
217 pub fn warnings(&self) -> u16 {
218 self.conn.get_warnings()
219 }
220
221 /// Collects the current result set of this query result.
222 ///
223 /// It is parametrized by `R` and internally calls `R::from_row(Row)` on each row.
224 ///
225 /// It will collect rows up to a neares result set boundary. This means that you should call
226 /// `collect` as many times as result sets in your query result. For example query
227 /// `SELECT 'foo'; SELECT 'foo', 'bar';` will produce `QueryResult` with two result sets in it.
228 /// One can use `QueryResult::is_empty` to make sure that there is no more result sets.
229 ///
230 /// # Panic
231 ///
232 /// It'll panic if any row isn't convertible to `R` (i.e. programmer error or unknown schema).
233 /// * In case of programmer error see [`FromRow`] docs;
234 /// * In case of unknown schema use [`QueryResult::try_collect`].
235 pub async fn collect<R>(&mut self) -> Result<Vec<R>>
236 where
237 R: FromRow + Send + 'static,
238 {
239 self.reduce(Vec::new(), |mut acc, row| {
240 acc.push(FromRow::from_row(row));
241 acc
242 })
243 .await
244 }
245
246 /// Collects the current result set of this query result.
247 ///
248 /// It works the same way as [`QueryResult::collect`] but won't panic if row isn't convertible
249 /// to `R`.
250 pub async fn try_collect<R>(&mut self) -> Result<Vec<StdResult<R, FromRowError>>>
251 where
252 R: FromRow + Send + 'static,
253 {
254 self.reduce(Vec::new(), |mut acc, row| {
255 acc.push(FromRow::from_row_opt(row));
256 acc
257 })
258 .await
259 }
260
261 /// Collects the current result set of this query result and drops everything else.
262 ///
263 /// # Panic
264 ///
265 /// It'll panic if any row isn't convertible to `R` (i.e. programmer error or unknown schema).
266 /// * In case of programmer error see `FromRow` docs;
267 /// * In case of unknown schema use [`QueryResult::try_collect`].
268 pub async fn collect_and_drop<R>(mut self) -> Result<Vec<R>>
269 where
270 R: FromRow + Send + 'static,
271 {
272 let output = self.collect::<R>().await?;
273 self.drop_result().await?;
274 Ok(output)
275 }
276
277 /// Collects the current result set of this query result and drops everything else.
278 ///
279 /// It works the same way as [`QueryResult::collect_and_drop`] but won't panic if row isn't
280 /// convertible to `R`.
281 pub async fn try_collect_and_drop<R>(mut self) -> Result<Vec<StdResult<R, FromRowError>>>
282 where
283 R: FromRow + Send + 'static,
284 {
285 let output = self.try_collect().await?;
286 self.drop_result().await?;
287 Ok(output)
288 }
289
290 /// Executes `fun` on every row of the current result set.
291 ///
292 /// It will stop on the nearest result set boundary (see `QueryResult::collect` docs).
293 pub async fn for_each<F>(&mut self, mut fun: F) -> Result<()>
294 where
295 F: FnMut(Row),
296 {
297 if self.is_empty() {
298 Ok(())
299 } else {
300 while let Some(row) = self.next().await? {
301 fun(row);
302 }
303 Ok(())
304 }
305 }
306
307 /// Executes `fun` on every row of the current result set and drops everything else.
308 pub async fn for_each_and_drop<F>(mut self, fun: F) -> Result<()>
309 where
310 F: FnMut(Row),
311 {
312 self.for_each(fun).await?;
313 self.drop_result().await?;
314 Ok(())
315 }
316
317 /// Maps every row of the current result set to `U` using `fun`.
318 ///
319 /// It will stop on the nearest result set boundary (see `QueryResult::collect` docs).
320 pub async fn map<F, U>(&mut self, mut fun: F) -> Result<Vec<U>>
321 where
322 F: FnMut(Row) -> U,
323 {
324 let mut acc = Vec::new();
325 while let Some(row) = self.next().await? {
326 acc.push(fun(crate::from_row(row)));
327 }
328 Ok(acc)
329 }
330
331 /// Map every row of the current result set to `U` using `fun` and drops everything else.
332 pub async fn map_and_drop<F, U>(mut self, fun: F) -> Result<Vec<U>>
333 where
334 F: FnMut(Row) -> U,
335 {
336 let rows = self.map(fun).await?;
337 self.drop_result().await?;
338 Ok(rows)
339 }
340
341 /// Reduces rows of the current result set to `U` using `fun`.
342 ///
343 /// It will stop on the nearest result set boundary (see `QueryResult::collect` docs).
344 pub async fn reduce<T, F, U>(&mut self, mut init: U, mut fun: F) -> Result<U>
345 where
346 F: FnMut(U, T) -> U,
347 T: FromRow + Send + 'static,
348 {
349 while let Some(row) = self.next().await? {
350 init = fun(init, crate::from_row(row));
351 }
352 Ok(init)
353 }
354
355 /// Reduces rows of the current result set to `U` using `fun` and drops everything else.
356 pub async fn reduce_and_drop<T, F, U>(mut self, init: U, fun: F) -> Result<U>
357 where
358 F: FnMut(U, T) -> U,
359 T: FromRow + Send + 'static,
360 {
361 let acc = self.reduce(init, fun).await?;
362 self.drop_result().await?;
363 Ok(acc)
364 }
365
366 /// Drops this query result.
367 pub async fn drop_result(mut self) -> Result<()> {
368 loop {
369 while self.next().await?.is_some() {}
370 if !self.conn.has_pending_result() {
371 break Ok(());
372 }
373 }
374 }
375
376 /// Returns a reference to a columns list of this query result.
377 ///
378 /// Empty list means that this result set was never meant to contain rows.
379 pub fn columns_ref(&self) -> &[Column] {
380 self.conn
381 .get_pending_result()
382 .ok()
383 .flatten()
384 .map(|meta| match meta {
385 PendingResult::Pending(meta) => &meta.columns()[..],
386 PendingResult::Taken(meta) => &meta.columns()[..],
387 })
388 .unwrap_or_default()
389 }
390
391 /// Returns a copy of a columns list of this query result.
392 pub fn columns(&self) -> Option<Arc<[Column]>> {
393 self.conn
394 .get_pending_result()
395 .ok()
396 .flatten()
397 .map(|meta| match meta {
398 PendingResult::Pending(meta) => meta.columns(),
399 PendingResult::Taken(meta) => meta.columns(),
400 })
401 .cloned()
402 }
403}