persistcli/
service.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![allow(missing_docs)]
11
12use std::net::SocketAddr;
13use std::str::FromStr;
14use std::sync::Arc;
15use std::time::Duration;
16
17use bytes::Bytes;
18use futures::StreamExt;
19use mz_ore::metrics::MetricsRegistry;
20use mz_ore::now::SYSTEM_TIME;
21use mz_persist::location::{SeqNo, VersionedData};
22use mz_persist_client::ShardId;
23use mz_persist_client::cfg::PersistConfig;
24use mz_persist_client::metrics::Metrics;
25use mz_persist_client::rpc::{
26    GrpcPubSubClient, PersistGrpcPubSubServer, PersistPubSubClient, PersistPubSubClientConfig,
27};
28use tracing::info;
29
30#[derive(clap::ValueEnum, Copy, Clone, Debug)]
31pub enum Role {
32    Server,
33    Writer,
34    Reader,
35}
36
37#[derive(Debug, clap::Parser)]
38pub struct Args {
39    #[clap(long, value_name = "HOST:PORT", default_value = "127.0.0.1:6878")]
40    listen_addr: SocketAddr,
41
42    #[clap(long, value_enum)]
43    role: Role,
44
45    connect_addrs: Vec<String>,
46}
47
48pub async fn run(args: Args) -> Result<(), anyhow::Error> {
49    let shard_id = ShardId::from_str("s00000000-0000-0000-0000-000000000000").expect("shard id");
50    let config =
51        PersistConfig::new_default_configs(&mz_persist_client::BUILD_INFO, SYSTEM_TIME.clone());
52    match args.role {
53        Role::Server => {
54            info!("listening on {}", args.listen_addr);
55            PersistGrpcPubSubServer::new(&config, &MetricsRegistry::new())
56                .serve(args.listen_addr.clone())
57                .await
58                .expect("server running");
59            info!("server ded");
60        }
61        Role::Writer => {
62            let connection = GrpcPubSubClient::connect(
63                PersistPubSubClientConfig {
64                    url: format!("http://{}", args.listen_addr),
65                    caller_id: "writer".to_string(),
66                    persist_cfg: config.clone(),
67                },
68                Arc::new(Metrics::new(&config, &MetricsRegistry::new())),
69            );
70
71            let mut i = 0;
72            loop {
73                info!("writing");
74                connection.sender.push_diff(
75                    &shard_id,
76                    &VersionedData {
77                        seqno: SeqNo(i),
78                        data: Bytes::default(),
79                    },
80                );
81                tokio::time::sleep(Duration::from_secs(1)).await;
82                i += 1;
83            }
84        }
85        Role::Reader => {
86            let mut connection = GrpcPubSubClient::connect(
87                PersistPubSubClientConfig {
88                    url: format!("http://{}", args.listen_addr),
89                    caller_id: "reader".to_string(),
90                    persist_cfg: config.clone(),
91                },
92                Arc::new(Metrics::new(&config, &MetricsRegistry::new())),
93            );
94
95            let _token = connection.sender.subscribe(&shard_id);
96            while let Some(message) = connection.receiver.next().await {
97                info!("client res: {:?}", message);
98            }
99            info!("stream to client ded");
100        }
101    }
102    Ok(())
103}