Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: accept grpc requests in tap-aggregator #253

Merged
merged 4 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ jobs:
image: rust:1.83-bookworm
steps:
- uses: actions/checkout@v3
- name: Install protobuf compiler
run: apt-get update && apt-get install protobuf-compiler -y
- uses: actions/cache@v3
with:
path: |
Expand All @@ -50,6 +52,8 @@ jobs:
image: rust:1.83-bookworm
steps:
- uses: actions/checkout@v3
- name: Install protobuf compiler
run: apt-get update && apt-get install protobuf-compiler -y
- uses: actions/cache@v3
with:
path: |
Expand All @@ -75,6 +79,8 @@ jobs:
image: rust:1.83-bookworm
steps:
- uses: actions/checkout@v3
- name: Install protobuf compiler
run: apt-get update && apt-get install protobuf-compiler -y
- uses: actions/cache@v3
with:
path: |
Expand Down
5 changes: 5 additions & 0 deletions Dockerfile.tap_aggregator
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
FROM rust:1.83-bookworm as build

WORKDIR /root

RUN apt-get update && apt-get install -y --no-install-recommends \
protobuf-compiler \
&& rm -rf /var/lib/apt/lists/*

COPY . .

RUN cargo build --release --bin tap_aggregator
Expand Down
9 changes: 8 additions & 1 deletion tap_aggregator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,14 @@ axum = { version = "0.7.5", features = [
futures-util = "0.3.28"
lazy_static = "1.4.0"
ruint = "1.10.1"
tower = { version = "0.4", features = ["util"] }
tower = { version = "0.4", features = ["util", "steer"] }
tonic = { version = "0.12.3", features = ["transport", "zstd"] }
prost = "0.13.3"
hyper = { version = "1", features = ["full"] }

[build-dependencies]
tonic-build = "0.12.3"


[dev-dependencies]
jsonrpsee = { workspace = true, features = ["http-client", "jsonrpsee-core"] }
Expand Down
11 changes: 11 additions & 0 deletions tap_aggregator/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright 2023-, Semiotic AI, Inc.
// SPDX-License-Identifier: Apache-2.0

fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Running build.rs...");
let out_dir = std::env::var("OUT_DIR").expect("OUT_DIR not set by Cargo");
println!("OUT_DIR: {}", out_dir); // This should print the output directory

tonic_build::compile_protos("./proto/tap_aggregator.proto")?;
Ok(())
}
48 changes: 48 additions & 0 deletions tap_aggregator/proto/tap_aggregator.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2023-, Semiotic AI, Inc.
// SPDX-License-Identifier: Apache-2.0

syntax = "proto3";
package tap_aggregator.v1;

message Receipt {
bytes allocation_id = 1;
uint64 timestamp_ns = 2;
uint64 nonce = 3;
Uint128 value = 4;
}

message SignedReceipt {
Receipt message = 1;
bytes signature = 2;
}

message ReceiptAggregateVoucher {
bytes allocation_id = 1;
uint64 timestamp_ns = 2;
Uint128 value_aggregate = 3;
}

message SignedRav {
ReceiptAggregateVoucher message = 1;
bytes signature = 2;
}

message RavRequest {
repeated SignedReceipt receipts = 1;
optional SignedRav previous_rav = 2;
}

message RavResponse {
SignedRav rav = 1;
}

service TapAggregator {
rpc AggregateReceipts(RavRequest) returns (RavResponse);
}

message Uint128 {
// Highest 64 bits of a 128 bit number.
uint64 high = 1;
// Lowest 64 bits of a 128 bit number.
uint64 low = 2;
}
135 changes: 135 additions & 0 deletions tap_aggregator/src/grpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2023-, Semiotic AI, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::anyhow;
use tap_core::signed_message::EIP712SignedMessage;

tonic::include_proto!("tap_aggregator.v1");

impl TryFrom<Receipt> for tap_core::receipt::Receipt {
type Error = anyhow::Error;
fn try_from(receipt: Receipt) -> Result<Self, Self::Error> {
Ok(Self {
allocation_id: receipt.allocation_id.as_slice().try_into()?,
timestamp_ns: receipt.timestamp_ns,
value: receipt.value.ok_or(anyhow!("Missing value"))?.into(),
nonce: receipt.nonce,
})
}
}

impl TryFrom<SignedReceipt> for tap_core::receipt::SignedReceipt {
type Error = anyhow::Error;
fn try_from(receipt: SignedReceipt) -> Result<Self, Self::Error> {
Ok(Self {
signature: receipt.signature.as_slice().try_into()?,
message: receipt
.message
.ok_or(anyhow!("Missing message"))?
.try_into()?,
})
}
}

impl From<tap_core::receipt::Receipt> for Receipt {
fn from(value: tap_core::receipt::Receipt) -> Self {
Self {
allocation_id: value.allocation_id.as_slice().to_vec(),
timestamp_ns: value.timestamp_ns,
nonce: value.nonce,
value: Some(value.value.into()),
}
}
}

impl From<tap_core::receipt::SignedReceipt> for SignedReceipt {
fn from(value: tap_core::receipt::SignedReceipt) -> Self {
Self {
message: Some(value.message.into()),
signature: value.signature.as_bytes().to_vec(),
}
}
}

impl TryFrom<SignedRav> for EIP712SignedMessage<tap_core::rav::ReceiptAggregateVoucher> {
type Error = anyhow::Error;
fn try_from(voucher: SignedRav) -> Result<Self, Self::Error> {
Ok(Self {
signature: voucher.signature.as_slice().try_into()?,
message: voucher
.message
.ok_or(anyhow!("Missing message"))?
.try_into()?,
})
}
}

impl From<EIP712SignedMessage<tap_core::rav::ReceiptAggregateVoucher>> for SignedRav {
fn from(voucher: EIP712SignedMessage<tap_core::rav::ReceiptAggregateVoucher>) -> Self {
Self {
signature: voucher.signature.as_bytes().to_vec(),
message: Some(voucher.message.into()),
}
}
}

impl TryFrom<ReceiptAggregateVoucher> for tap_core::rav::ReceiptAggregateVoucher {
type Error = anyhow::Error;
fn try_from(voucher: ReceiptAggregateVoucher) -> Result<Self, Self::Error> {
Ok(Self {
allocationId: voucher.allocation_id.as_slice().try_into()?,
timestampNs: voucher.timestamp_ns,
valueAggregate: voucher
.value_aggregate
.ok_or(anyhow!("Missing Value Aggregate"))?
.into(),
})
}
}

impl From<tap_core::rav::ReceiptAggregateVoucher> for ReceiptAggregateVoucher {
fn from(voucher: tap_core::rav::ReceiptAggregateVoucher) -> Self {
Self {
allocation_id: voucher.allocationId.to_vec(),
timestamp_ns: voucher.timestampNs,
value_aggregate: Some(voucher.valueAggregate.into()),
}
}
}

impl From<Uint128> for u128 {
fn from(Uint128 { high, low }: Uint128) -> Self {
((high as u128) << 64) | low as u128
}
}

impl From<u128> for Uint128 {
fn from(value: u128) -> Self {
let high = (value >> 64) as u64;
let low = value as u64;
Self { high, low }
}
}

impl RavRequest {
pub fn new(
receipts: Vec<tap_core::receipt::SignedReceipt>,
previous_rav: Option<tap_core::rav::SignedRAV>,
) -> Self {
Self {
receipts: receipts.into_iter().map(Into::into).collect(),
previous_rav: previous_rav.map(Into::into),
}
}
}

impl RavResponse {
pub fn signed_rav(mut self) -> anyhow::Result<tap_core::rav::SignedRAV> {
let signed_rav: tap_core::rav::SignedRAV = self
.rav
.take()
.ok_or(anyhow!("Couldn't find rav"))?
.try_into()?;
Ok(signed_rav)
}
}
1 change: 1 addition & 0 deletions tap_aggregator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
pub mod aggregator;
pub mod api_versioning;
pub mod error_codes;
pub mod grpc;
pub mod jsonrpsee_helpers;
pub mod metrics;
pub mod server;
48 changes: 11 additions & 37 deletions tap_aggregator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,15 @@

#![doc = include_str!("../README.md")]

use std::borrow::Cow;
use std::collections::HashSet;
use std::str::FromStr;

use alloy::dyn_abi::Eip712Domain;
use alloy::primitives::Address;
use alloy::primitives::FixedBytes;
use alloy::signers::local::PrivateKeySigner;
use std::{collections::HashSet, str::FromStr};

use alloy::{dyn_abi::Eip712Domain, primitives::Address, signers::local::PrivateKeySigner};
use anyhow::Result;
use clap::Parser;
use ruint::aliases::U256;
use tokio::signal::unix::{signal, SignalKind};

use log::{debug, info};
use tap_aggregator::metrics;
use tap_aggregator::server;
use tap_core::tap_eip712_domain;

use tap_aggregator::{metrics, server};

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
Expand Down Expand Up @@ -126,37 +119,22 @@ async fn main() -> Result<()> {
.await?;
info!("Server started. Listening on port {}.", args.port);

// Have tokio wait for SIGTERM or SIGINT.
let mut signal_sigint = signal(SignalKind::interrupt())?;
let mut signal_sigterm = signal(SignalKind::terminate())?;
tokio::select! {
_ = signal_sigint.recv() => debug!("Received SIGINT."),
_ = signal_sigterm.recv() => debug!("Received SIGTERM."),
}
let _ = handle.await;

// If we're here, we've received a signal to exit.
info!("Shutting down...");

// Stop the server and wait for it to finish gracefully.
handle.stop()?;
handle.stopped().await;

debug!("Goodbye!");
Ok(())
}

fn create_eip712_domain(args: &Args) -> Result<Eip712Domain> {
// Transfrom the args into the types expected by Eip712Domain::new().

// Transform optional strings into optional Cow<str>.
let name = args.domain_name.clone().map(Cow::Owned);
let version = args.domain_version.clone().map(Cow::Owned);

// Transform optional strings into optional U256.
if args.domain_chain_id.is_some() {
debug!("Parsing domain chain ID...");
}
let chain_id: Option<U256> = args
let chain_id: Option<u64> = args
.domain_chain_id
.as_ref()
.map(|s| s.parse())
Expand All @@ -165,17 +143,13 @@ fn create_eip712_domain(args: &Args) -> Result<Eip712Domain> {
if args.domain_salt.is_some() {
debug!("Parsing domain salt...");
}
let salt: Option<FixedBytes<32>> = args.domain_salt.as_ref().map(|s| s.parse()).transpose()?;

// Transform optional strings into optional Address.
let verifying_contract: Option<Address> = args.domain_verifying_contract;

// Create the EIP-712 domain separator.
Ok(Eip712Domain::new(
name,
version,
chain_id,
verifying_contract,
salt,
Ok(tap_eip712_domain(
chain_id.unwrap_or(1),
verifying_contract.unwrap_or_default(),
))
}
Loading
Loading