Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: move EventSender out of BeaconConsensusEngineHandle #13533

Merged
merged 1 commit into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 3 additions & 13 deletions crates/consensus/beacon/src/engine/handle.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! `BeaconConsensusEngine` external API

use crate::{BeaconConsensusEngineEvent, BeaconForkChoiceUpdateError};
use crate::BeaconForkChoiceUpdateError;
use alloy_rpc_types_engine::{
ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState, ForkchoiceUpdated, PayloadStatus,
};
Expand All @@ -10,7 +10,6 @@ use reth_engine_primitives::{
OnForkChoiceUpdated,
};
use reth_errors::RethResult;
use reth_tokio_util::{EventSender, EventStream};
use tokio::sync::{mpsc::UnboundedSender, oneshot};

/// A _shareable_ beacon consensus frontend type. Used to interact with the spawned beacon consensus
Expand All @@ -23,7 +22,6 @@ where
Engine: EngineTypes,
{
pub(crate) to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
event_sender: EventSender<BeaconConsensusEngineEvent>,
}

// === impl BeaconConsensusEngineHandle ===
Expand All @@ -33,11 +31,8 @@ where
Engine: EngineTypes,
{
/// Creates a new beacon consensus engine handle.
pub const fn new(
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
event_sender: EventSender<BeaconConsensusEngineEvent>,
) -> Self {
Self { to_engine, event_sender }
pub const fn new(to_engine: UnboundedSender<BeaconEngineMessage<Engine>>) -> Self {
Self { to_engine }
}

/// Sends a new payload message to the beacon consensus engine and waits for a response.
Expand Down Expand Up @@ -96,9 +91,4 @@ where
pub fn transition_configuration_exchanged(&self) {
let _ = self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged);
}

/// Creates a new [`BeaconConsensusEngineEvent`] listener stream.
pub fn event_listener(&self) -> EventStream<BeaconConsensusEngineEvent> {
self.event_sender.new_listener()
}
}
2 changes: 1 addition & 1 deletion crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ where
hooks: EngineHooks,
) -> RethResult<(Self, BeaconConsensusEngineHandle<N::Engine>)> {
let event_sender = EventSender::default();
let handle = BeaconConsensusEngineHandle::new(to_engine, event_sender.clone());
let handle = BeaconConsensusEngineHandle::new(to_engine);
let sync = EngineSyncController::new(
pipeline,
client,
Expand Down
5 changes: 2 additions & 3 deletions crates/node/builder/src/launch/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,7 @@ where
info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");

let event_sender = EventSender::default();
let beacon_engine_handle =
BeaconConsensusEngineHandle::new(consensus_engine_tx.clone(), event_sender.clone());
let beacon_engine_handle = BeaconConsensusEngineHandle::new(consensus_engine_tx.clone());

// extract the jwt secret from the args if possible
let jwt_secret = ctx.auth_jwt_secret()?;
Expand Down Expand Up @@ -271,7 +270,7 @@ where
info!(target: "reth::cli", "Consensus engine initialized");

let events = stream_select!(
beacon_engine_handle.event_listener().map(Into::into),
event_sender.new_listener().map(Into::into),
pipeline_events.map(Into::into),
if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() {
Either::Left(
Expand Down
3 changes: 1 addition & 2 deletions crates/rpc/rpc-builder/tests/it/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ pub const fn test_address() -> SocketAddr {
pub async fn launch_auth(secret: JwtSecret) -> AuthServerHandle {
let config = AuthServerConfig::builder(secret).socket_addr(test_address()).build();
let (tx, _rx) = unbounded_channel();
let beacon_engine_handle =
BeaconConsensusEngineHandle::<EthEngineTypes>::new(tx, Default::default());
let beacon_engine_handle = BeaconConsensusEngineHandle::<EthEngineTypes>::new(tx);
let client = ClientVersionV1 {
code: ClientCode::RH,
name: "Reth".to_string(),
Expand Down
5 changes: 1 addition & 4 deletions crates/rpc/rpc-engine-api/src/engine_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,6 @@ mod tests {
use super::*;
use alloy_rpc_types_engine::{ClientCode, ClientVersionV1};
use assert_matches::assert_matches;
use reth_beacon_consensus::BeaconConsensusEngineEvent;
use reth_chainspec::{ChainSpec, MAINNET};
use reth_engine_primitives::BeaconEngineMessage;
use reth_ethereum_engine_primitives::{EthEngineTypes, EthereumEngineValidator};
Expand All @@ -1041,7 +1040,6 @@ mod tests {
use reth_rpc_types_compat::engine::payload::execution_payload_from_sealed_block;
use reth_tasks::TokioTaskExecutor;
use reth_testing_utils::generators::random_block;
use reth_tokio_util::EventSender;
use reth_transaction_pool::noop::NoopTransactionPool;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};

Expand All @@ -1066,12 +1064,11 @@ mod tests {
let provider = Arc::new(MockEthProvider::default());
let payload_store = spawn_test_payload_service();
let (to_engine, engine_rx) = unbounded_channel();
let event_sender: EventSender<BeaconConsensusEngineEvent> = Default::default();
let task_executor = Box::<TokioTaskExecutor>::default();
let api = EngineApi::new(
provider.clone(),
chain_spec.clone(),
BeaconConsensusEngineHandle::new(to_engine, event_sender),
BeaconConsensusEngineHandle::new(to_engine),
payload_store.into(),
NoopTransactionPool::default(),
task_executor,
Expand Down
Loading