persistcli/
service.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

#![allow(missing_docs)]

use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use bytes::Bytes;
use futures::StreamExt;
use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::SYSTEM_TIME;
use mz_persist::location::{SeqNo, VersionedData};
use mz_persist_client::cfg::PersistConfig;
use mz_persist_client::metrics::Metrics;
use mz_persist_client::rpc::{
    GrpcPubSubClient, PersistGrpcPubSubServer, PersistPubSubClient, PersistPubSubClientConfig,
};
use mz_persist_client::ShardId;
use tracing::info;

#[derive(clap::ValueEnum, Copy, Clone, Debug)]
pub enum Role {
    Server,
    Writer,
    Reader,
}

#[derive(Debug, clap::Parser)]
pub struct Args {
    #[clap(long, value_name = "HOST:PORT", default_value = "127.0.0.1:6878")]
    listen_addr: SocketAddr,

    #[clap(long, value_enum)]
    role: Role,

    connect_addrs: Vec<String>,
}

pub async fn run(args: Args) -> Result<(), anyhow::Error> {
    let shard_id = ShardId::from_str("s00000000-0000-0000-0000-000000000000").expect("shard id");
    let config =
        PersistConfig::new_default_configs(&mz_persist_client::BUILD_INFO, SYSTEM_TIME.clone());
    match args.role {
        Role::Server => {
            info!("listening on {}", args.listen_addr);
            PersistGrpcPubSubServer::new(&config, &MetricsRegistry::new())
                .serve(args.listen_addr.clone())
                .await
                .expect("server running");
            info!("server ded");
        }
        Role::Writer => {
            let connection = GrpcPubSubClient::connect(
                PersistPubSubClientConfig {
                    url: format!("http://{}", args.listen_addr),
                    caller_id: "writer".to_string(),
                    persist_cfg: config.clone(),
                },
                Arc::new(Metrics::new(&config, &MetricsRegistry::new())),
            );

            let mut i = 0;
            loop {
                info!("writing");
                connection.sender.push_diff(
                    &shard_id,
                    &VersionedData {
                        seqno: SeqNo(i),
                        data: Bytes::default(),
                    },
                );
                tokio::time::sleep(Duration::from_secs(1)).await;
                i += 1;
            }
        }
        Role::Reader => {
            let mut connection = GrpcPubSubClient::connect(
                PersistPubSubClientConfig {
                    url: format!("http://{}", args.listen_addr),
                    caller_id: "reader".to_string(),
                    persist_cfg: config.clone(),
                },
                Arc::new(Metrics::new(&config, &MetricsRegistry::new())),
            );

            let _token = connection.sender.subscribe(&shard_id);
            while let Some(message) = connection.receiver.next().await {
                info!("client res: {:?}", message);
            }
            info!("stream to client ded");
        }
    }
    Ok(())
}