Skip to content

Commit

Permalink
grpc server scaffolding (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
suurkivi authored Nov 6, 2024
1 parent acc1741 commit 639bd85
Show file tree
Hide file tree
Showing 9 changed files with 334 additions and 14 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ tokio = { version = "1.40.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = "0.10.6"
tonic = "0.9.2"
prost = "0.11.9"
tonic = "0.12.3"
prost = "0.13.3"
futures = "0.3.28"
parking_lot = "0.12.1"
clap = { version = "4.3.0", features = ["derive"] }
libp2p = { version = "0.54.1", features = ["tokio", "gossipsub", "mdns", "noise", "macros", "tcp", "yamux", "quic"] }
async-trait = "0.1.68"
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "fmt", "json"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "fmt", "json"] }
hex = "0.4.3"
ractor = "0.11.2"
malachite-consensus = { path = "../malachite/code/crates/consensus" }
Expand Down
10 changes: 8 additions & 2 deletions build.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
// tonic_build::compile_protos("src/proto/blocks.proto")?;
let mut builder = tonic_build::configure();

// Custom type attributes required for malachite
builder = builder.type_attribute("snapchain.ShardHash", "#[derive(Eq, PartialOrd, Ord)]");

builder.compile(&["src/proto/blocks.proto"], &["src/proto"])?;
// TODO: auto-discover proto files
builder.compile(&[
"src/proto/blocks.proto",
"src/proto/rpc.proto",
"src/proto/message.proto",
"src/proto/username_proof.proto",
], &["src/proto"])?;

Ok(())
}
60 changes: 52 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,39 @@ pub mod network;
pub mod connectors;
mod cfg;

use std::error::Error;
use std::io;
use std::net::SocketAddr;
use clap::Parser;
use futures::stream::StreamExt;
use libp2p::identity::ed25519::Keypair;
use malachite_config::TimeoutConfig;
use malachite_metrics::{Metrics, SharedRegistry};
use std::time::Duration;
use tokio::signal::ctrl_c;
use tokio::sync::mpsc;
use tokio::{select, time};
use tokio::time::sleep;
use tonic::transport::Server;
use tracing::{error, info};
use tracing_subscriber::EnvFilter;
use connectors::fname::Fetcher;


use crate::consensus::consensus::{Consensus, ConsensusMsg, ConsensusParams};
use crate::core::types::{proto, Address, Height, ShardId, SnapchainShard, SnapchainValidator, SnapchainValidatorContext, SnapchainValidatorSet};
use crate::network::gossip::{GossipEvent};
use crate::network::gossip::GossipEvent;
use network::gossip::SnapchainGossip;
use network::server::MySnapchainService;
use network::server::rpc::snapchain_service_server::SnapchainServiceServer;

pub enum SystemMessage {
Consensus(ConsensusMsg<SnapchainValidatorContext>),
}


#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
async fn main() -> Result<(), Box<dyn Error>> {
let args: Vec<String> = std::env::args().collect();

let app_config = cfg::load_and_merge_config(args)?;
Expand All @@ -39,7 +49,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let port = base_port + app_config.id;
let addr = format!("/ip4/0.0.0.0/udp/{}/quic-v1", port);

println!("SnapchainService (ID: {}) listening on {}", app_config.id, addr);
let base_grpc_port = 50060;
let grpc_port = base_grpc_port + app_config.id;
let grpc_addr = format!("0.0.0.0:{}", grpc_port);
let grpc_socket_addr: SocketAddr = grpc_addr.parse()?;

info!(
id = app_config.id,
addr = addr,
grpc_addr = grpc_addr,
"SnapchainService listening",
);

let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
match app_config.log_format.as_str() {
Expand All @@ -60,17 +80,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let gossip_result = SnapchainGossip::create(keypair.clone(), addr, system_tx.clone());
if let Err(e) = gossip_result {
println!("Failed to create SnapchainGossip: {:?}", e);
error!(error = ?e, "Failed to create SnapchainGossip");
return Ok(());
}

let mut gossip = gossip_result?;
let gossip_tx = gossip.tx.clone();

tokio::spawn(async move {
println!("Starting gossip");
info!("Starting gossip");
gossip.start().await;
println!("Gossip Stopped");
info!("Gossip Stopped");
});

if !app_config.fnames.disable {
Expand All @@ -81,6 +101,25 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});
}

let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);

tokio::spawn(async move {
let service = MySnapchainService::default();

let resp = Server::builder()
.add_service(SnapchainServiceServer::new(service))
.serve(grpc_socket_addr)
.await;

let msg = "grpc server stopped";
match resp {
Ok(()) => error!(msg),
Err(e) => error!(error = ?e, "{}", msg),
}

shutdown_tx.send(()).await.ok();
});

let registry = SharedRegistry::global();
let metrics = Metrics::register(registry);

Expand Down Expand Up @@ -119,7 +158,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
loop {
select! {
_ = ctrl_c() => {
println!("Received Ctrl-C, shutting down");
info!("Received Ctrl-C, shutting down");
consensus_actor.stop(None);
return Ok(());
}
_ = shutdown_rx.recv() => {
error!("Received shutdown signal, shutting down");
consensus_actor.stop(None);
return Ok(());
}
Expand All @@ -134,7 +178,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}),
nonce: tick_count as u64, // Need the nonce to avoid the gossip duplicate message check
};
println!("Registering validator with nonce: {}", register_validator.nonce);
info!("Registering validator with nonce: {}", register_validator.nonce);
gossip_tx.send(GossipEvent::RegisterValidator(register_validator)).await?;
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/network/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::time::Duration;
use tokio::io;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tracing::info;

pub enum GossipEvent<Ctx: SnapchainContext> {
BroadcastSignedVote(SignedVote<Ctx>),
Expand Down Expand Up @@ -124,7 +125,7 @@ impl SnapchainGossip {
SwarmEvent::Behaviour(SnapchainBehaviorEvent::Gossipsub(gossipsub::Event::Unsubscribed { peer_id, topic })) =>
println!("Peer: {peer_id} unsubscribed to topic: {topic}"),
SwarmEvent::NewListenAddr { address, .. } => {
println!("Local node is listening on {address}");
info!(address = address.to_string(), "Local node is listening");
},
SwarmEvent::Behaviour(SnapchainBehaviorEvent::Gossipsub(gossipsub::Event::Message {
propagation_source: peer_id,
Expand Down
1 change: 1 addition & 0 deletions src/network/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod gossip;
pub mod server;
34 changes: 34 additions & 0 deletions src/network/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::error::Error;
use std::net::SocketAddr;
use tonic::{transport::Server, Request, Response, Status};
use tonic::Code::Unimplemented;
use tracing::{info};
use hex::ToHex;


pub mod rpc {
tonic::include_proto!("rpc");
}

pub mod message {
tonic::include_proto!("message");
}

pub mod username_proof {
tonic::include_proto!("username_proof");
}

use rpc::snapchain_service_server::{SnapchainService, SnapchainServiceServer};
use message::{Message};

#[derive(Default)]
pub struct MySnapchainService;

#[tonic::async_trait]
impl SnapchainService for MySnapchainService {
async fn submit_message(&self, request: Request<Message>) -> Result<Response<Message>, Status> {
let hash = request.get_ref().hash.encode_hex::<String>();
info!(hash, "Received a message");
Err(Status::new(Unimplemented, "not implemented"))
}
}
Loading

0 comments on commit 639bd85

Please sign in to comment.