azure_core/
pipeline.rs

1use crate::policies::TransportPolicy;
2use crate::policies::{CustomHeadersPolicy, Policy, TelemetryPolicy};
3use crate::{ClientOptions, Context, Request, Response};
4use std::sync::Arc;
5
6/// Execution pipeline.
7///
8/// A pipeline follows a precise flow:
9///
10/// 1. Client library-specified per-call policies are executed. Per-call policies can fail and bail out of the pipeline
11///    immediately.
12/// 2. User-specified per-call policies are executed.
13/// 3. Telemetry policy.
14/// 4. Retry policy. It allows to re-execute the following policies.
15/// 5. Client library-specified per-retry policies. Per-retry polices are always executed at least once but are re-executed
16///    in case of retries.
17/// 6. User-specified per-retry policies are executed.
18/// 7. Authorization policy. Authorization can depend on the HTTP headers and/or the request body so it
19///    must be executed right before sending the request to the transport. Also, the authorization
20///    can depend on the current time so it must be executed at every retry.
21/// 8. Transport policy. Transport policy is always the last policy and is the policy that
22///    actually constructs the `Response` to be passed up the pipeline.
23///
24/// A pipeline is immutable. In other words a policy can either succeed and call the following
25/// policy of fail and return to the calling policy. Arbitrary policy "skip" must be avoided (but
26/// cannot be enforced by code). All policies except Transport policy can assume there is another following policy (so
27/// `self.pipeline[0]` is always valid).
28///
29/// The `C` generic contains the pipeline-specific context. Different crates can pass
30/// different contexts using this generic. This way each crate can have its own specific pipeline
31/// context. For example, in `CosmosDB`, the generic carries the operation-specific information used by
32/// the authorization policy.
33#[derive(Debug, Clone)]
34pub struct Pipeline {
35    pipeline: Vec<Arc<dyn Policy>>,
36}
37
38impl Pipeline {
39    /// Creates a new pipeline given the client library crate name and version,
40    /// alone with user-specified and client library-specified policies.
41    ///
42    /// Crates can simply pass `option_env!("CARGO_PKG_NAME")` and `option_env!("CARGO_PKG_VERSION")` for the
43    /// `crate_name` and `crate_version` arguments respectively.
44    pub fn new(
45        crate_name: Option<&'static str>,
46        crate_version: Option<&'static str>,
47        options: ClientOptions,
48        per_call_policies: Vec<Arc<dyn Policy>>,
49        per_retry_policies: Vec<Arc<dyn Policy>>,
50    ) -> Self {
51        let mut pipeline: Vec<Arc<dyn Policy>> = Vec::with_capacity(
52            options.per_call_policies.len()
53                + per_call_policies.len()
54                + options.per_retry_policies.len()
55                + per_retry_policies.len()
56                + 3,
57        );
58
59        pipeline.extend_from_slice(&per_call_policies);
60        pipeline.extend_from_slice(&options.per_call_policies);
61
62        let telemetry_policy = TelemetryPolicy::new(crate_name, crate_version, &options.telemetry);
63        pipeline.push(Arc::new(telemetry_policy));
64
65        pipeline.push(Arc::new(CustomHeadersPolicy::default()));
66
67        let retry_policy = options.retry.to_policy();
68        pipeline.push(retry_policy);
69
70        pipeline.extend_from_slice(&per_retry_policies);
71        pipeline.extend_from_slice(&options.per_retry_policies);
72
73        let transport: Arc<dyn Policy> = Arc::new(TransportPolicy::new(options.transport.clone()));
74
75        pipeline.push(transport);
76
77        Self { pipeline }
78    }
79
80    pub fn replace_policy(&mut self, policy: Arc<dyn Policy>, position: usize) -> Arc<dyn Policy> {
81        std::mem::replace(&mut self.pipeline[position], policy)
82    }
83
84    pub fn policies(&self) -> &[Arc<dyn Policy>] {
85        &self.pipeline
86    }
87
88    pub async fn send(&self, ctx: &Context, request: &mut Request) -> crate::Result<Response> {
89        self.pipeline[0]
90            .send(ctx, request, &self.pipeline[1..])
91            .await
92    }
93}