persistcli/
service.rs
1#![allow(missing_docs)]
11
12use std::net::SocketAddr;
13use std::str::FromStr;
14use std::sync::Arc;
15use std::time::Duration;
16
17use bytes::Bytes;
18use futures::StreamExt;
19use mz_ore::metrics::MetricsRegistry;
20use mz_ore::now::SYSTEM_TIME;
21use mz_persist::location::{SeqNo, VersionedData};
22use mz_persist_client::ShardId;
23use mz_persist_client::cfg::PersistConfig;
24use mz_persist_client::metrics::Metrics;
25use mz_persist_client::rpc::{
26 GrpcPubSubClient, PersistGrpcPubSubServer, PersistPubSubClient, PersistPubSubClientConfig,
27};
28use tracing::info;
29
30#[derive(clap::ValueEnum, Copy, Clone, Debug)]
31pub enum Role {
32 Server,
33 Writer,
34 Reader,
35}
36
37#[derive(Debug, clap::Parser)]
38pub struct Args {
39 #[clap(long, value_name = "HOST:PORT", default_value = "127.0.0.1:6878")]
40 listen_addr: SocketAddr,
41
42 #[clap(long, value_enum)]
43 role: Role,
44
45 connect_addrs: Vec<String>,
46}
47
48pub async fn run(args: Args) -> Result<(), anyhow::Error> {
49 let shard_id = ShardId::from_str("s00000000-0000-0000-0000-000000000000").expect("shard id");
50 let config =
51 PersistConfig::new_default_configs(&mz_persist_client::BUILD_INFO, SYSTEM_TIME.clone());
52 match args.role {
53 Role::Server => {
54 info!("listening on {}", args.listen_addr);
55 PersistGrpcPubSubServer::new(&config, &MetricsRegistry::new())
56 .serve(args.listen_addr.clone())
57 .await
58 .expect("server running");
59 info!("server ded");
60 }
61 Role::Writer => {
62 let connection = GrpcPubSubClient::connect(
63 PersistPubSubClientConfig {
64 url: format!("http://{}", args.listen_addr),
65 caller_id: "writer".to_string(),
66 persist_cfg: config.clone(),
67 },
68 Arc::new(Metrics::new(&config, &MetricsRegistry::new())),
69 );
70
71 let mut i = 0;
72 loop {
73 info!("writing");
74 connection.sender.push_diff(
75 &shard_id,
76 &VersionedData {
77 seqno: SeqNo(i),
78 data: Bytes::default(),
79 },
80 );
81 tokio::time::sleep(Duration::from_secs(1)).await;
82 i += 1;
83 }
84 }
85 Role::Reader => {
86 let mut connection = GrpcPubSubClient::connect(
87 PersistPubSubClientConfig {
88 url: format!("http://{}", args.listen_addr),
89 caller_id: "reader".to_string(),
90 persist_cfg: config.clone(),
91 },
92 Arc::new(Metrics::new(&config, &MetricsRegistry::new())),
93 );
94
95 let _token = connection.sender.subscribe(&shard_id);
96 while let Some(message) = connection.receiver.next().await {
97 info!("client res: {:?}", message);
98 }
99 info!("stream to client ded");
100 }
101 }
102 Ok(())
103}