Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(core): collect runs batched checks #180

Merged
merged 4 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions tap_core/src/tap_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,19 @@ impl<
let mut accepted_signed_receipts = Vec::<SignedReceipt>::new();
let mut failed_signed_receipts = Vec::<SignedReceipt>::new();

for (receipt_id, mut received_receipt) in received_receipts {
received_receipt
.finalize_receipt_checks(receipt_id, &self.receipt_auditor)
.await?;
let mut received_receipts: Vec<ReceivedReceipt> =
received_receipts.into_iter().map(|e| e.1).collect();

for check in self.required_checks.iter() {
ReceivedReceipt::perform_check_batch(
&mut received_receipts,
check,
&self.receipt_auditor,
)
.await?;
}

for received_receipt in received_receipts {
if received_receipt.is_accepted() {
accepted_signed_receipts.push(received_receipt.signed_receipt);
} else {
Expand Down
147 changes: 147 additions & 0 deletions tap_core/src/tap_receipt/receipt_auditor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright 2023-, Semiotic AI, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashSet;

use alloy_sol_types::Eip712Domain;
use ethers::types::Signature;
use tokio::sync::RwLock;

use crate::{
Expand All @@ -12,6 +15,8 @@ use crate::{
Error, Result,
};

use super::ReceivedReceipt;

pub struct ReceiptAuditor<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> {
domain_separator: Eip712Domain,
escrow_adapter: EA,
Expand Down Expand Up @@ -58,6 +63,25 @@ impl<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> ReceiptAuditor<EA, RCA> {
}
}

pub async fn check_batch(
&self,
receipt_check: &ReceiptCheck,
received_receipts: &mut [ReceivedReceipt],
) -> Vec<ReceiptResult<()>> {
match receipt_check {
ReceiptCheck::CheckUnique => self.check_uniqueness_batch(received_receipts).await,
ReceiptCheck::CheckAllocationId => {
self.check_allocation_id_batch(received_receipts).await
}
ReceiptCheck::CheckSignature => self.check_signature_batch(received_receipts).await,
ReceiptCheck::CheckTimestamp => self.check_timestamp_batch(received_receipts).await,
ReceiptCheck::CheckValue => self.check_value_batch(received_receipts).await,
ReceiptCheck::CheckAndReserveEscrow => {
self.check_and_reserve_escrow_batch(received_receipts).await
}
}
}

async fn check_uniqueness(
&self,
signed_receipt: &EIP712SignedMessage<Receipt>,
Expand All @@ -76,6 +100,33 @@ impl<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> ReceiptAuditor<EA, RCA> {
Ok(())
}

async fn check_uniqueness_batch(
&self,
received_receipts: &mut [ReceivedReceipt],
) -> Vec<ReceiptResult<()>> {
let mut results = Vec::new();

// If at least one of the receipts in the batch hasn't been checked for uniqueness yet, check the whole batch.
if received_receipts
.iter()
.filter(|r| r.checks.contains_key(&ReceiptCheck::CheckUnique))
.any(|r| r.checks[&ReceiptCheck::CheckUnique].is_none())
{
let mut signatures: HashSet<Signature> = HashSet::new();

for received_receipt in received_receipts {
let signature = received_receipt.signed_receipt.signature;
if signatures.insert(signature) {
results.push(Ok(()));
} else {
results.push(Err(ReceiptError::NonUniqueReceipt));
}
}
}

results
}

async fn check_allocation_id(
&self,
signed_receipt: &EIP712SignedMessage<Receipt>,
Expand All @@ -95,6 +146,25 @@ impl<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> ReceiptAuditor<EA, RCA> {
Ok(())
}

async fn check_allocation_id_batch(
&self,
received_receipts: &mut [ReceivedReceipt],
) -> Vec<ReceiptResult<()>> {
let mut results = Vec::new();

for received_receipt in received_receipts
.iter_mut()
.filter(|r| r.checks.contains_key(&ReceiptCheck::CheckAllocationId))
{
if received_receipt.checks[&ReceiptCheck::CheckAllocationId].is_none() {
let signed_receipt = &received_receipt.signed_receipt;
results.push(self.check_allocation_id(signed_receipt).await);
}
}

results
}

async fn check_timestamp(
&self,
signed_receipt: &EIP712SignedMessage<Receipt>,
Expand All @@ -108,6 +178,26 @@ impl<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> ReceiptAuditor<EA, RCA> {
}
Ok(())
}

async fn check_timestamp_batch(
&self,
received_receipts: &mut [ReceivedReceipt],
) -> Vec<ReceiptResult<()>> {
let mut results = Vec::new();

for received_receipt in received_receipts
.iter_mut()
.filter(|r| r.checks.contains_key(&ReceiptCheck::CheckTimestamp))
{
if received_receipt.checks[&ReceiptCheck::CheckTimestamp].is_none() {
let signed_receipt = &received_receipt.signed_receipt;
results.push(self.check_timestamp(signed_receipt).await);
}
}

results
}

async fn check_value(
&self,
signed_receipt: &EIP712SignedMessage<Receipt>,
Expand All @@ -128,6 +218,28 @@ impl<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> ReceiptAuditor<EA, RCA> {
Ok(())
}

async fn check_value_batch(
&self,
received_receipts: &mut [ReceivedReceipt],
) -> Vec<ReceiptResult<()>> {
let mut results = Vec::new();

for received_receipt in received_receipts
.iter_mut()
.filter(|r| r.checks.contains_key(&ReceiptCheck::CheckValue))
{
if received_receipt.checks[&ReceiptCheck::CheckValue].is_none() {
let signed_receipt = &received_receipt.signed_receipt;
results.push(
self.check_value(signed_receipt, received_receipt.query_id)
.await,
);
}
}

results
}

async fn check_signature(
&self,
signed_receipt: &EIP712SignedMessage<Receipt>,
Expand Down Expand Up @@ -155,6 +267,25 @@ impl<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> ReceiptAuditor<EA, RCA> {
Ok(())
}

async fn check_signature_batch(
&self,
received_receipts: &mut [ReceivedReceipt],
) -> Vec<ReceiptResult<()>> {
let mut results = Vec::new();

for received_receipt in received_receipts
.iter_mut()
.filter(|r| r.checks.contains_key(&ReceiptCheck::CheckSignature))
{
if received_receipt.checks[&ReceiptCheck::CheckSignature].is_none() {
let signed_receipt = &received_receipt.signed_receipt;
results.push(self.check_signature(signed_receipt).await);
}
}

results
}

async fn check_and_reserve_escrow(
&self,
signed_receipt: &EIP712SignedMessage<Receipt>,
Expand All @@ -176,6 +307,22 @@ impl<EA: EscrowAdapter, RCA: ReceiptChecksAdapter> ReceiptAuditor<EA, RCA> {
Ok(())
}

async fn check_and_reserve_escrow_batch(
&self,
received_receipts: &mut [ReceivedReceipt],
) -> Vec<ReceiptResult<()>> {
let mut results = Vec::new();

for received_receipt in received_receipts.iter_mut().filter(|r| {
r.escrow_reserve_attempt_required() && !r.escrow_reserve_attempt_completed()
}) {
let signed_receipt = &received_receipt.signed_receipt;
results.push(self.check_and_reserve_escrow(signed_receipt).await);
}

results
}

pub async fn check_rav_signature(
&self,
signed_rav: &EIP712SignedMessage<ReceiptAggregateVoucher>,
Expand Down
21 changes: 18 additions & 3 deletions tap_core/src/tap_receipt/received_receipt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,21 @@ impl ReceivedReceipt {
result
}

pub async fn perform_check_batch<CA: EscrowAdapter, RCA: ReceiptChecksAdapter>(
batch: &mut [Self],
check: &ReceiptCheck,
receipt_auditor: &ReceiptAuditor<CA, RCA>,
) -> Result<()> {
let results = receipt_auditor.check_batch(check, batch).await;

for (receipt, result) in batch.iter_mut().zip(results) {
receipt.update_check(check, Some(result))?;
receipt.update_state();
}

Ok(())
}

/// Completes a list of *incomplete* check and stores the result, if the check already has a result it is skipped
///
/// Returns `Err` only if unable to complete a check, returns `Ok` if the checks were completed (*Important:* this is not the result of the check, just the result of _completing_ the check)
Expand Down Expand Up @@ -293,7 +308,7 @@ impl ReceivedReceipt {
}

/// Updates receieved receipt state based on internal values, should be called anytime internal state changes
fn update_state(&mut self) {
pub(crate) fn update_state(&mut self) {
let mut next_state = self.state.clone();
match self.state {
ReceiptState::Received => {
Expand Down Expand Up @@ -357,14 +372,14 @@ impl ReceivedReceipt {
ReceiptState::AwaitingReserveEscrow
}

fn escrow_reserve_attempt_completed(&self) -> bool {
pub(crate) fn escrow_reserve_attempt_completed(&self) -> bool {
if let Some(escrow_reserve_attempt) = &self.escrow_reserved {
return escrow_reserve_attempt.is_some();
}
false
}

fn escrow_reserve_attempt_required(&self) -> bool {
pub(crate) fn escrow_reserve_attempt_required(&self) -> bool {
self.escrow_reserved.is_some()
}

Expand Down
Loading