Skip to content

Commit

Permalink
Merge pull request #943 from carver/refactor-history-content-proxy
Browse files Browse the repository at this point in the history
refactor: reuse logic to fetch history content
  • Loading branch information
carver authored Oct 4, 2023
2 parents c1cb241 + 674ca53 commit 040ad41
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 41 deletions.
30 changes: 30 additions & 0 deletions rpc/src/fetch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/// Fetch data from related Portal networks
use serde_json::Value;
use tokio::sync::mpsc;

use ethportal_api::types::jsonrpc::endpoints::HistoryEndpoint;
use ethportal_api::types::jsonrpc::request::HistoryJsonRpcRequest;

use crate::errors::RpcServeError;

pub async fn proxy_query_to_history_subnet(
network: &mpsc::UnboundedSender<HistoryJsonRpcRequest>,
endpoint: HistoryEndpoint,
) -> Result<Value, RpcServeError> {
let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<Result<Value, String>>();
let message = HistoryJsonRpcRequest {
endpoint,
resp: resp_tx,
};
let _ = network.send(message);

match resp_rx.recv().await {
Some(val) => match val {
Ok(result) => Ok(result),
Err(msg) => Err(RpcServeError::Message(msg)),
},
None => Err(RpcServeError::Message(
"Internal error: No response from chain history subnetwork".to_string(),
)),
}
}
59 changes: 18 additions & 41 deletions rpc/src/history_rpc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::errors::RpcServeError;
use crate::fetch::proxy_query_to_history_subnet;
use crate::serde::from_value;

use crate::jsonrpsee::core::{async_trait, RpcResult};
Expand All @@ -16,7 +16,6 @@ use ethportal_api::HistoryContentValue;
use ethportal_api::HistoryNetworkApiServer;
use ethportal_api::PossibleHistoryContentValue;
use ethportal_api::RoutingTableInfo;
use serde_json::Value;
use tokio::sync::mpsc;

pub struct HistoryNetworkApi {
Expand All @@ -27,76 +26,54 @@ impl HistoryNetworkApi {
pub fn new(network: mpsc::UnboundedSender<HistoryJsonRpcRequest>) -> Self {
Self { network }
}

pub async fn proxy_query_to_history_subnet(
&self,
endpoint: HistoryEndpoint,
) -> Result<Value, RpcServeError> {
let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<Result<Value, String>>();
let message = HistoryJsonRpcRequest {
endpoint,
resp: resp_tx,
};
let _ = self.network.send(message);

match resp_rx.recv().await {
Some(val) => match val {
Ok(result) => Ok(result),
Err(msg) => Err(RpcServeError::Message(msg)),
},
None => Err(RpcServeError::Message(
"Internal error: No response from chain history subnetwork".to_string(),
)),
}
}
}

#[async_trait]
impl HistoryNetworkApiServer for HistoryNetworkApi {
/// Returns meta information about overlay routing table.
async fn routing_table_info(&self) -> RpcResult<RoutingTableInfo> {
let endpoint = HistoryEndpoint::RoutingTableInfo;
let result = self.proxy_query_to_history_subnet(endpoint).await?;
let result = proxy_query_to_history_subnet(&self.network, endpoint).await?;
let result: RoutingTableInfo = from_value(result)?;
Ok(result)
}

/// Write an Ethereum Node Record to the overlay routing table.
async fn add_enr(&self, enr: Enr) -> RpcResult<bool> {
let endpoint = HistoryEndpoint::AddEnr(enr);
let result = self.proxy_query_to_history_subnet(endpoint).await?;
let result = proxy_query_to_history_subnet(&self.network, endpoint).await?;
let result: bool = from_value(result)?;
Ok(result)
}

/// Fetch the latest ENR associated with the given node ID.
async fn get_enr(&self, node_id: NodeId) -> RpcResult<Enr> {
let endpoint = HistoryEndpoint::GetEnr(node_id);
let result = self.proxy_query_to_history_subnet(endpoint).await?;
let result = proxy_query_to_history_subnet(&self.network, endpoint).await?;
let result: Enr = from_value(result)?;
Ok(result)
}

/// Delete Node ID from the overlay routing table.
async fn delete_enr(&self, node_id: NodeId) -> RpcResult<bool> {
let endpoint = HistoryEndpoint::DeleteEnr(node_id);
let result = self.proxy_query_to_history_subnet(endpoint).await?;
let result = proxy_query_to_history_subnet(&self.network, endpoint).await?;
let result: bool = from_value(result)?;
Ok(result)
}

/// Fetch the ENR representation associated with the given Node ID.
async fn lookup_enr(&self, node_id: NodeId) -> RpcResult<Enr> {
let endpoint = HistoryEndpoint::LookupEnr(node_id);
let result = self.proxy_query_to_history_subnet(endpoint).await?;
let result = proxy_query_to_history_subnet(&self.network, endpoint).await?;
let result: Enr = from_value(result)?;
Ok(result)
}

/// Send a PING message to the designated node and wait for a PONG response
async fn ping(&self, enr: Enr) -> RpcResult<PongInfo> {
let endpoint = HistoryEndpoint::Ping(enr);
let result = self.proxy_query_to_history_subnet(endpoint).await?;
let result = proxy_query_to_history_subnet(&self.network, endpoint).await?;
let result: PongInfo = from_value(result)?;
Ok(result)
}
Expand All @@ -105,23 +82,23 @@ impl HistoryNetworkApiServer for HistoryNetworkApi {
/// peer and wait for a response
async fn find_nodes(&self, enr: Enr, distances: Vec<u16>) -> RpcResult<FindNodesInfo> {
let endpoint = HistoryEndpoint::FindNodes(enr, distances);
let result = self.proxy_query_to_history_subnet(endpoint).await?;
let result = proxy_query_to_history_subnet(&self.network, endpoint).await?;
let result: FindNodesInfo = from_value(result)?;
Ok(result)
}

/// Lookup a target node within in the network
async fn recursive_find_nodes(&self, node_id: NodeId) -> RpcResult<Vec<Enr>> {
let endpoint = HistoryEndpoint::RecursiveFindNodes(node_id);
let result = self.proxy_query_to_history_subnet(endpoint).await?;
let result = proxy_query_to_history_subnet(&self.network, endpoint).await?;
let result: Vec<Enr> = from_value(result)?;
Ok(result)
}

/// Lookup a target node within in the network
async fn radius(&self) -> RpcResult<DataRadius> {
let endpoint = HistoryEndpoint::DataRadius;
let result = self.proxy_query_to_history_subnet(endpoint).await?;
let result = proxy_query_to_history_subnet(&self.network, endpoint).await?;
let result: DataRadius = from_value(result)?;
Ok(result)
}
Expand All @@ -133,7 +110,7 @@ impl HistoryNetworkApiServer for HistoryNetworkApi {
content_key: HistoryContentKey,
) -> RpcResult<ContentInfo> {
let endpoint = HistoryEndpoint::FindContent(enr, content_key);
let result = self.proxy_query_to_history_subnet(endpoint).await?;
let result = proxy_query_to_history_subnet(&self.network, endpoint).await?;
let result: ContentInfo = from_value(result)?;
Ok(result)
}
Expand All @@ -144,7 +121,7 @@ impl HistoryNetworkApiServer for HistoryNetworkApi {
content_key: HistoryContentKey,
) -> RpcResult<ContentInfo> {
let endpoint = HistoryEndpoint::RecursiveFindContent(content_key);
let result = self.proxy_query_to_history_subnet(endpoint).await?;
let result = proxy_query_to_history_subnet(&self.network, endpoint).await?;
if result == serde_json::Value::String(CONTENT_ABSENT.to_string()) {
return Ok(ContentInfo::Content {
content: PossibleHistoryContentValue::ContentAbsent,
Expand All @@ -161,7 +138,7 @@ impl HistoryNetworkApiServer for HistoryNetworkApi {
content_key: HistoryContentKey,
) -> RpcResult<TraceContentInfo> {
let endpoint = HistoryEndpoint::TraceRecursiveFindContent(content_key);
let result = self.proxy_query_to_history_subnet(endpoint).await?;
let result = proxy_query_to_history_subnet(&self.network, endpoint).await?;
let info: TraceContentInfo = from_value(result)?;
Ok(info)
}
Expand All @@ -173,7 +150,7 @@ impl HistoryNetworkApiServer for HistoryNetworkApi {
limit: u64,
) -> RpcResult<PaginateLocalContentInfo> {
let endpoint = HistoryEndpoint::PaginateLocalContentKeys(offset, limit);
let result = self.proxy_query_to_history_subnet(endpoint).await?;
let result = proxy_query_to_history_subnet(&self.network, endpoint).await?;
let result: PaginateLocalContentInfo = from_value(result)?;
Ok(result)
}
Expand All @@ -186,7 +163,7 @@ impl HistoryNetworkApiServer for HistoryNetworkApi {
content_value: HistoryContentValue,
) -> RpcResult<u32> {
let endpoint = HistoryEndpoint::Gossip(content_key, content_value);
let result = self.proxy_query_to_history_subnet(endpoint).await?;
let result = proxy_query_to_history_subnet(&self.network, endpoint).await?;
let result: u32 = from_value(result)?;
Ok(result)
}
Expand All @@ -200,7 +177,7 @@ impl HistoryNetworkApiServer for HistoryNetworkApi {
content_value: Option<HistoryContentValue>,
) -> RpcResult<AcceptInfo> {
let endpoint = HistoryEndpoint::Offer(enr, content_key, content_value);
let result = self.proxy_query_to_history_subnet(endpoint).await?;
let result = proxy_query_to_history_subnet(&self.network, endpoint).await?;
let result: AcceptInfo = from_value(result)?;
Ok(result)
}
Expand All @@ -212,7 +189,7 @@ impl HistoryNetworkApiServer for HistoryNetworkApi {
content_value: HistoryContentValue,
) -> RpcResult<bool> {
let endpoint = HistoryEndpoint::Store(content_key, content_value);
let result = self.proxy_query_to_history_subnet(endpoint).await?;
let result = proxy_query_to_history_subnet(&self.network, endpoint).await?;
let result: bool = from_value(result)?;
Ok(result)
}
Expand All @@ -223,7 +200,7 @@ impl HistoryNetworkApiServer for HistoryNetworkApi {
content_key: HistoryContentKey,
) -> RpcResult<PossibleHistoryContentValue> {
let endpoint = HistoryEndpoint::LocalContent(content_key);
let result = self.proxy_query_to_history_subnet(endpoint).await?;
let result = proxy_query_to_history_subnet(&self.network, endpoint).await?;
if result == serde_json::Value::String(CONTENT_ABSENT.to_string()) {
return Ok(PossibleHistoryContentValue::ContentAbsent);
};
Expand Down
1 change: 1 addition & 0 deletions rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod cors;
mod discv5_rpc;
mod errors;
mod eth_rpc;
mod fetch;
mod history_rpc;
mod rpc_server;
mod serde;
Expand Down

0 comments on commit 040ad41

Please sign in to comment.