Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] Pedro/tap 183 add grpc server for aggregation #249

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion tap_aggregator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,13 @@ axum = { version = "0.7.5", features = [
futures-util = "0.3.28"
lazy_static = "1.4.0"
ruint = "1.10.1"
tower = { version = "0.4", features = ["util"] }
tower = { version = "0.4", features = ["util", "steer"] }
tonic = { version = "0.12.3", features = ["transport"] }
prost = "0.13.3"

[build-dependencies]
tonic-build = "0.12.3"


[dev-dependencies]
jsonrpsee = { workspace = true, features = ["http-client", "jsonrpsee-core"] }
Expand Down
8 changes: 8 additions & 0 deletions tap_aggregator/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Running build.rs...");
let out_dir = std::env::var("OUT_DIR").expect("OUT_DIR not set by Cargo");
println!("OUT_DIR: {}", out_dir); // This should print the output directory

tonic_build::compile_protos("./proto/tap_aggregator.proto")?;
Ok(())
}
58 changes: 58 additions & 0 deletions tap_aggregator/proto/tap_aggregator.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
syntax = "proto3";
package tap_aggregator.v1;

message Receipt {
bytes allocation_id = 1;
uint64 timestamp_ns = 2;
uint64 nonce = 3;
Uint128 value = 4;
}

message SignedReceipt {
Receipt message = 1;
bytes signature = 2;
}

message ReceiptAggregateVoucher {
bytes allocation_id = 1;
uint64 timestamp_ns = 2;
Uint128 value_aggregate = 3;
}

message SignedRav {
ReceiptAggregateVoucher message = 1;
bytes signature = 2;
}

message RavRequest {
repeated SignedReceipt receipts = 1;
optional SignedRav previous_rav = 2;
}

message RavResponse {
SignedRav rav = 1;
}

message TapRpcApiVersion {
string version = 1;
}

message TapRpcApiVersionsInfo {
repeated TapRpcApiVersion versions_supported = 1;
repeated TapRpcApiVersion versions_deprecated = 2;
}

// Optional request message for ApiVersions (TODO: should we use use google.protobuf.Empty?)
message ApiVersionsRequest {}

service TapAggregator {
rpc ApiVersions(ApiVersionsRequest) returns (TapRpcApiVersionsInfo);
rpc AggregateReceipts(RavRequest) returns (RavResponse);
}

message Uint128 {
// Highest 64 bits of a 128 bit number.
uint64 high = 1;
// Lowest 64 bits of a 128 bit number.
uint64 low = 2;
}
45 changes: 45 additions & 0 deletions tap_aggregator/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use anyhow::Result;
use jsonrpsee::{proc_macros::rpc, server::ServerBuilder, server::ServerHandle};
use lazy_static::lazy_static;
use prometheus::{register_counter, register_int_counter, Counter, IntCounter};
// use tower::steer::Steer;

use crate::aggregator::check_and_aggregate_receipts;
use crate::api_versioning::{
Expand All @@ -22,6 +23,10 @@ use tap_core::{
rav::ReceiptAggregateVoucher, receipt::Receipt, signed_message::EIP712SignedMessage,
};

pub mod tap_aggregator {
tonic::include_proto!("tap_aggregator.v1");
}

// Register the metrics into the global metrics registry.
lazy_static! {
static ref AGGREGATION_SUCCESS_COUNTER: IntCounter = register_int_counter!(
Expand Down Expand Up @@ -171,6 +176,45 @@ fn aggregate_receipts_(
}
}

use tap_aggregator::tap_aggregator_server::TapAggregator;
use tap_aggregator::{
ApiVersionsRequest, RavRequest, RavResponse, SignedRav,
TapRpcApiVersionsInfo as TapGRpcApiVersionsInfo,
};
use tonic::{Request, Response, Status};

#[tonic::async_trait]
impl TapAggregator for RpcImpl {
async fn api_versions(
&self,
_request: Request<ApiVersionsRequest>,
) -> Result<Response<TapGRpcApiVersionsInfo>, Status> {
// Example response
let response = TapGRpcApiVersionsInfo {
versions_deprecated: vec![],
versions_supported: vec![],
};

Ok(Response::new(response))
}

async fn aggregate_receipts(
&self,
request: Request<RavRequest>,
) -> Result<Response<RavResponse>, Status> {
let rav_request = request.into_inner();

// Example implementation: create a dummy response
let response = RavResponse {
rav: Some(SignedRav {
message: None, // Fill this with your aggregate logic
signature: vec![],
}),
};
Ok(Response::new(response))
}
}

impl RpcServer for RpcImpl {
fn api_versions(&self) -> JsonRpcResult<TapRpcApiVersionsInfo> {
Ok(JsonRpcResponse::ok(tap_rpc_api_versions_info()))
Expand Down Expand Up @@ -208,6 +252,7 @@ impl RpcServer for RpcImpl {
}
}

// TODO: create a gRPC alike run_server
pub async fn run_server(
port: u16,
wallet: PrivateKeySigner,
Expand Down
89 changes: 89 additions & 0 deletions tap_aggregator/src/tap_aggregator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use anyhow::anyhow;
use tap_core::signed_message::EIP712SignedMessage;

tonic::include_proto!("tap_aggregator.v1");

impl TryFrom<Receipt> for tap_core::receipt::Receipt {
type Error = anyhow::Error;
fn try_from(receipt: Receipt) -> Result<Self, Self::Error> {
Ok(Self {
allocation_id: receipt.allocation_id.as_slice().try_into()?,
timestamp_ns: receipt.timestamp_ns,
value: receipt.value.ok_or(anyhow!("Missing value"))?.into(),
nonce: receipt.nonce,
})
}
}

impl TryFrom<SignedReceipt> for tap_core::receipt::SignedReceipt {
type Error = anyhow::Error;
fn try_from(receipt: SignedReceipt) -> Result<Self, Self::Error> {
Ok(Self {
signature: receipt.signature.as_slice().try_into()?,
message: receipt
.message
.ok_or(anyhow!("Missing message"))?
.try_into()?,
})
}
}

impl TryFrom<SignedRav> for EIP712SignedMessage<tap_core::rav::ReceiptAggregateVoucher> {
type Error = anyhow::Error;
fn try_from(voucher: SignedRav) -> Result<Self, Self::Error> {
Ok(Self {
signature: voucher.signature.as_slice().try_into()?,
message: voucher
.message
.ok_or(anyhow!("Missing message"))?
.try_into()?,
})
}
}

impl From<EIP712SignedMessage<tap_core::rav::ReceiptAggregateVoucher>> for SignedRav {
fn from(voucher: EIP712SignedMessage<tap_core::rav::ReceiptAggregateVoucher>) -> Self {
Self {
signature: voucher.signature.as_bytes().to_vec(),
message: Some(voucher.message.into()),
}
}
}

impl TryFrom<ReceiptAggregateVoucher> for tap_core::rav::ReceiptAggregateVoucher {
type Error = anyhow::Error;
fn try_from(voucher: ReceiptAggregateVoucher) -> Result<Self, Self::Error> {
Ok(Self {
allocationId: voucher.allocation_id.as_slice().try_into()?,
timestampNs: voucher.timestamp_ns,
valueAggregate: voucher
.value_aggregate
.ok_or(anyhow!("Missing Value Aggregate"))?
.into(),
})
}
}

impl From<tap_core::rav::ReceiptAggregateVoucher> for ReceiptAggregateVoucher {
fn from(voucher: tap_core::rav::ReceiptAggregateVoucher) -> Self {
Self {
allocation_id: voucher.allocationId.to_vec(),
timestamp_ns: voucher.timestampNs,
value_aggregate: Some(voucher.valueAggregate.into()),
}
}
}

impl From<Uint128> for u128 {
fn from(Uint128 { high, low }: Uint128) -> Self {
((high as u128) << 64) | low as u128
}
}

impl From<u128> for Uint128 {
fn from(value: u128) -> Self {
let high = (value >> 64) as u64;
let low = value as u64;
Self { high, low }
}
}
Loading