Skip to content

Commit

Permalink
feat: accept grpc requests in tap-aggregator (#253)
Browse files Browse the repository at this point in the history
* feat: add proto definitions

Signed-off-by: Gustavo Inacio <[email protected]>

* feat: gRPC service implementation

Signed-off-by: Gustavo Inacio <[email protected]>
Signed-off-by: pedro bufulin <[email protected]>

* ci: add protobuf compiler

Signed-off-by: Gustavo Inacio <[email protected]>

* build: add protobuf compiler to dockerfile

Signed-off-by: Gustavo Inacio <[email protected]>

---------

Signed-off-by: Gustavo Inacio <[email protected]>
Signed-off-by: pedro bufulin <[email protected]>
Co-authored-by: pedro bufulin <[email protected]>
  • Loading branch information
gusinacio and pedrohba1 authored Dec 27, 2024
1 parent a6c5193 commit 3c56018
Show file tree
Hide file tree
Showing 11 changed files with 512 additions and 93 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ jobs:
image: rust:1.83-bookworm
steps:
- uses: actions/checkout@v3
- name: Install protobuf compiler
run: apt-get update && apt-get install protobuf-compiler -y
- uses: actions/cache@v3
with:
path: |
Expand All @@ -50,6 +52,8 @@ jobs:
image: rust:1.83-bookworm
steps:
- uses: actions/checkout@v3
- name: Install protobuf compiler
run: apt-get update && apt-get install protobuf-compiler -y
- uses: actions/cache@v3
with:
path: |
Expand All @@ -75,6 +79,8 @@ jobs:
image: rust:1.83-bookworm
steps:
- uses: actions/checkout@v3
- name: Install protobuf compiler
run: apt-get update && apt-get install protobuf-compiler -y
- uses: actions/cache@v3
with:
path: |
Expand Down
5 changes: 5 additions & 0 deletions Dockerfile.tap_aggregator
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
FROM rust:1.83-bookworm as build

WORKDIR /root

RUN apt-get update && apt-get install -y --no-install-recommends \
protobuf-compiler \
&& rm -rf /var/lib/apt/lists/*

COPY . .

RUN cargo build --release --bin tap_aggregator
Expand Down
9 changes: 8 additions & 1 deletion tap_aggregator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,14 @@ axum = { version = "0.7.5", features = [
futures-util = "0.3.28"
lazy_static = "1.4.0"
ruint = "1.10.1"
tower = { version = "0.4", features = ["util"] }
tower = { version = "0.4", features = ["util", "steer"] }
tonic = { version = "0.12.3", features = ["transport", "zstd"] }
prost = "0.13.3"
hyper = { version = "1", features = ["full"] }

[build-dependencies]
tonic-build = "0.12.3"


[dev-dependencies]
jsonrpsee = { workspace = true, features = ["http-client", "jsonrpsee-core"] }
Expand Down
11 changes: 11 additions & 0 deletions tap_aggregator/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright 2023-, Semiotic AI, Inc.
// SPDX-License-Identifier: Apache-2.0

fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Running build.rs...");
let out_dir = std::env::var("OUT_DIR").expect("OUT_DIR not set by Cargo");
println!("OUT_DIR: {}", out_dir); // This should print the output directory

tonic_build::compile_protos("./proto/tap_aggregator.proto")?;
Ok(())
}
48 changes: 48 additions & 0 deletions tap_aggregator/proto/tap_aggregator.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2023-, Semiotic AI, Inc.
// SPDX-License-Identifier: Apache-2.0

syntax = "proto3";
package tap_aggregator.v1;

message Receipt {
bytes allocation_id = 1;
uint64 timestamp_ns = 2;
uint64 nonce = 3;
Uint128 value = 4;
}

message SignedReceipt {
Receipt message = 1;
bytes signature = 2;
}

message ReceiptAggregateVoucher {
bytes allocation_id = 1;
uint64 timestamp_ns = 2;
Uint128 value_aggregate = 3;
}

message SignedRav {
ReceiptAggregateVoucher message = 1;
bytes signature = 2;
}

message RavRequest {
repeated SignedReceipt receipts = 1;
optional SignedRav previous_rav = 2;
}

message RavResponse {
SignedRav rav = 1;
}

service TapAggregator {
rpc AggregateReceipts(RavRequest) returns (RavResponse);
}

message Uint128 {
// Highest 64 bits of a 128 bit number.
uint64 high = 1;
// Lowest 64 bits of a 128 bit number.
uint64 low = 2;
}
135 changes: 135 additions & 0 deletions tap_aggregator/src/grpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2023-, Semiotic AI, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::anyhow;
use tap_core::signed_message::EIP712SignedMessage;

tonic::include_proto!("tap_aggregator.v1");

impl TryFrom<Receipt> for tap_core::receipt::Receipt {
type Error = anyhow::Error;
fn try_from(receipt: Receipt) -> Result<Self, Self::Error> {
Ok(Self {
allocation_id: receipt.allocation_id.as_slice().try_into()?,
timestamp_ns: receipt.timestamp_ns,
value: receipt.value.ok_or(anyhow!("Missing value"))?.into(),
nonce: receipt.nonce,
})
}
}

impl TryFrom<SignedReceipt> for tap_core::receipt::SignedReceipt {
type Error = anyhow::Error;
fn try_from(receipt: SignedReceipt) -> Result<Self, Self::Error> {
Ok(Self {
signature: receipt.signature.as_slice().try_into()?,
message: receipt
.message
.ok_or(anyhow!("Missing message"))?
.try_into()?,
})
}
}

impl From<tap_core::receipt::Receipt> for Receipt {
fn from(value: tap_core::receipt::Receipt) -> Self {
Self {
allocation_id: value.allocation_id.as_slice().to_vec(),
timestamp_ns: value.timestamp_ns,
nonce: value.nonce,
value: Some(value.value.into()),
}
}
}

impl From<tap_core::receipt::SignedReceipt> for SignedReceipt {
fn from(value: tap_core::receipt::SignedReceipt) -> Self {
Self {
message: Some(value.message.into()),
signature: value.signature.as_bytes().to_vec(),
}
}
}

impl TryFrom<SignedRav> for EIP712SignedMessage<tap_core::rav::ReceiptAggregateVoucher> {
type Error = anyhow::Error;
fn try_from(voucher: SignedRav) -> Result<Self, Self::Error> {
Ok(Self {
signature: voucher.signature.as_slice().try_into()?,
message: voucher
.message
.ok_or(anyhow!("Missing message"))?
.try_into()?,
})
}
}

impl From<EIP712SignedMessage<tap_core::rav::ReceiptAggregateVoucher>> for SignedRav {
fn from(voucher: EIP712SignedMessage<tap_core::rav::ReceiptAggregateVoucher>) -> Self {
Self {
signature: voucher.signature.as_bytes().to_vec(),
message: Some(voucher.message.into()),
}
}
}

impl TryFrom<ReceiptAggregateVoucher> for tap_core::rav::ReceiptAggregateVoucher {
type Error = anyhow::Error;
fn try_from(voucher: ReceiptAggregateVoucher) -> Result<Self, Self::Error> {
Ok(Self {
allocationId: voucher.allocation_id.as_slice().try_into()?,
timestampNs: voucher.timestamp_ns,
valueAggregate: voucher
.value_aggregate
.ok_or(anyhow!("Missing Value Aggregate"))?
.into(),
})
}
}

impl From<tap_core::rav::ReceiptAggregateVoucher> for ReceiptAggregateVoucher {
fn from(voucher: tap_core::rav::ReceiptAggregateVoucher) -> Self {
Self {
allocation_id: voucher.allocationId.to_vec(),
timestamp_ns: voucher.timestampNs,
value_aggregate: Some(voucher.valueAggregate.into()),
}
}
}

impl From<Uint128> for u128 {
fn from(Uint128 { high, low }: Uint128) -> Self {
((high as u128) << 64) | low as u128
}
}

impl From<u128> for Uint128 {
fn from(value: u128) -> Self {
let high = (value >> 64) as u64;
let low = value as u64;
Self { high, low }
}
}

impl RavRequest {
pub fn new(
receipts: Vec<tap_core::receipt::SignedReceipt>,
previous_rav: Option<tap_core::rav::SignedRAV>,
) -> Self {
Self {
receipts: receipts.into_iter().map(Into::into).collect(),
previous_rav: previous_rav.map(Into::into),
}
}
}

impl RavResponse {
pub fn signed_rav(mut self) -> anyhow::Result<tap_core::rav::SignedRAV> {
let signed_rav: tap_core::rav::SignedRAV = self
.rav
.take()
.ok_or(anyhow!("Couldn't find rav"))?
.try_into()?;
Ok(signed_rav)
}
}
1 change: 1 addition & 0 deletions tap_aggregator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
pub mod aggregator;
pub mod api_versioning;
pub mod error_codes;
pub mod grpc;
pub mod jsonrpsee_helpers;
pub mod metrics;
pub mod server;
48 changes: 11 additions & 37 deletions tap_aggregator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,15 @@

#![doc = include_str!("../README.md")]

use std::borrow::Cow;
use std::collections::HashSet;
use std::str::FromStr;

use alloy::dyn_abi::Eip712Domain;
use alloy::primitives::Address;
use alloy::primitives::FixedBytes;
use alloy::signers::local::PrivateKeySigner;
use std::{collections::HashSet, str::FromStr};

use alloy::{dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner};
use anyhow::Result;
use clap::Parser;
use ruint::aliases::U256;
use tokio::signal::unix::{signal, SignalKind};

use log::{debug, info};
use tap_aggregator::metrics;
use tap_aggregator::server;
use tap_core::tap_eip712_domain;

use tap_aggregator::{metrics, server};

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
Expand Down Expand Up @@ -126,37 +119,22 @@ async fn main() -> Result<()> {
.await?;
info!("Server started. Listening on port {}.", args.port);

// Have tokio wait for SIGTERM or SIGINT.
let mut signal_sigint = signal(SignalKind::interrupt())?;
let mut signal_sigterm = signal(SignalKind::terminate())?;
tokio::select! {
_ = signal_sigint.recv() => debug!("Received SIGINT."),
_ = signal_sigterm.recv() => debug!("Received SIGTERM."),
}
let _ = handle.await;

// If we're here, we've received a signal to exit.
info!("Shutting down...");

// Stop the server and wait for it to finish gracefully.
handle.stop()?;
handle.stopped().await;

debug!("Goodbye!");
Ok(())
}

fn create_eip712_domain(args: &Args) -> Result<Eip712Domain> {
// Transfrom the args into the types expected by Eip712Domain::new().

// Transform optional strings into optional Cow<str>.
let name = args.domain_name.clone().map(Cow::Owned);
let version = args.domain_version.clone().map(Cow::Owned);

// Transform optional strings into optional U256.
if args.domain_chain_id.is_some() {
debug!("Parsing domain chain ID...");
}
let chain_id: Option<U256> = args
let chain_id: Option<u64> = args
.domain_chain_id
.as_ref()
.map(|s| s.parse())
Expand All @@ -165,17 +143,13 @@ fn create_eip712_domain(args: &Args) -> Result<Eip712Domain> {
if args.domain_salt.is_some() {
debug!("Parsing domain salt...");
}
let salt: Option<FixedBytes<32>> = args.domain_salt.as_ref().map(|s| s.parse()).transpose()?;

// Transform optional strings into optional Address.
let verifying_contract: Option<Address> = args.domain_verifying_contract;

// Create the EIP-712 domain separator.
Ok(Eip712Domain::new(
name,
version,
chain_id,
verifying_contract,
salt,
Ok(tap_eip712_domain(
chain_id.unwrap_or(1),
verifying_contract.unwrap_or_default(),
))
}
Loading

0 comments on commit 3c56018

Please sign in to comment.