From c76c31201de3083367b5311d47bd39006203a375 Mon Sep 17 00:00:00 2001 From: sebasti810 Date: Fri, 8 Nov 2024 12:17:23 +0100 Subject: [PATCH] first little cleanup, mocha sampling works now, lets do a rebase --- .../lightclient/wasm/js/src/App.tsx | 21 --- .../lightclient/wasm/js/src/index.css | 2 - .../lightclient/wasm/src/celestia/client.rs | 13 +- .../lightclient/wasm/src/celestia/config.rs | 18 ++- .../node_types/lightclient/wasm/src/client.rs | 12 +- .../node_types/lightclient/wasm/src/worker.rs | 13 +- .../wasm/src/worker_communication.rs | 127 +++++++----------- crates/tests/src/lib.rs | 28 ++-- 8 files changed, 89 insertions(+), 145 deletions(-) diff --git a/crates/node_types/lightclient/wasm/js/src/App.tsx b/crates/node_types/lightclient/wasm/js/src/App.tsx index 82788259..2dbaee24 100644 --- a/crates/node_types/lightclient/wasm/js/src/App.tsx +++ b/crates/node_types/lightclient/wasm/js/src/App.tsx @@ -1,11 +1,9 @@ import { useState } from 'react' import init, { LightClientWorker, WasmLightClient } from "../../pkg/prism_wasm_lightclient.js"; -/* import OperationSubmitter from './components/OperationSubmitter.js'; */ function App() { const [status, setStatus] = useState('Not started') const [height, setHeight] = useState(0) - const [client, setClient] = useState(null) const startLightClient = async () => { try { @@ -17,7 +15,6 @@ function App() { worker.run() const client = await new WasmLightClient(channel.port2) - setClient(client); setStatus('Running') channel.port2.onmessage = (event) => { @@ -36,15 +33,6 @@ function App() { } } -/* const verifyRandomEpoch = async () => { - try { - setStatus('Verifying epoch...') - setStatus('Epoch verified successfully') - } catch (error) { - setStatus(`Verification failed: ${error}`) - } - } */ - return (

Light Client Test

@@ -67,16 +55,7 @@ function App() { > Start Light Client - - {/* */}
- {/* client && */} ) } diff --git a/crates/node_types/lightclient/wasm/js/src/index.css b/crates/node_types/lightclient/wasm/js/src/index.css index e7d4bb2f..225cb731 100644 --- a/crates/node_types/lightclient/wasm/js/src/index.css +++ b/crates/node_types/lightclient/wasm/js/src/index.css @@ -29,8 +29,6 @@ a:hover { body { margin: 0; display: flex; - place-items: center; - min-width: 320px; min-height: 100vh; } diff --git a/crates/node_types/lightclient/wasm/src/celestia/client.rs b/crates/node_types/lightclient/wasm/src/celestia/client.rs index 51fcfa9e..bfc2c7d9 100644 --- a/crates/node_types/lightclient/wasm/src/celestia/client.rs +++ b/crates/node_types/lightclient/wasm/src/celestia/client.rs @@ -33,17 +33,12 @@ pub struct WasmCelestiaClient { impl WasmCelestiaClient { pub async fn new(config: CelestiaConfig) -> Result { - let bridge_addr = CelestiaConfig::fetch_bridge_webtransport_multiaddr().await; - - console::log_2( - &"🚀 Bridge address:".into(), - &bridge_addr.to_string().into(), - ); + /* let bridge_addr = CelestiaConfig::fetch_bridge_webtransport_multiaddr().await; */ let current_height = Arc::new(AtomicU64::new(config.start_height)); - let mut wasm_node_config = WasmNodeConfig::default(Network::Private); - wasm_node_config.set_bridge_bootnode(bridge_addr.to_string()); + let mut wasm_node_config = WasmNodeConfig::default(Network::Mocha); + wasm_node_config.set_bridge_bootnode(/* bridge_addr.to_string() */); let node_config = wasm_node_config.initialize_node_config().await?; let (node, mut event_subscriber) = Node::new_subscribed(node_config).await?; @@ -154,7 +149,7 @@ impl WasmCelestiaClient { .request_all_blobs( &header, Namespace::new_v0(&namespace).unwrap(), - Some(Duration::from_secs(5)), + Some(Duration::from_secs(7)), ) .await { diff --git a/crates/node_types/lightclient/wasm/src/celestia/config.rs b/crates/node_types/lightclient/wasm/src/celestia/config.rs index ede3c4d2..3be76cde 100644 --- a/crates/node_types/lightclient/wasm/src/celestia/config.rs +++ b/crates/node_types/lightclient/wasm/src/celestia/config.rs @@ -3,8 +3,7 @@ use celestia_rpc::{Client, P2PClient}; use libp2p::{identity::Keypair, multiaddr::Protocol, Multiaddr}; use lumina_node::{ - blockstore::IndexedDbBlockstore, events::NodeEvent, network::network_id, store::IndexedDbStore, - NodeConfig, + blockstore::IndexedDbBlockstore, network::network_id, store::IndexedDbStore, NodeConfig, }; use lumina_node_wasm::{ client::WasmNodeConfig, @@ -27,7 +26,7 @@ impl Default for CelestiaConfig { fn default() -> Self { Self { node_url: "ws://localhost:26658".to_string(), - start_height: 0, + start_height: 3093687, snark_namespace_id: "00000000000000de1008".to_string(), operation_namespace_id: Some("00000000000000de1009".to_string()), } @@ -66,7 +65,7 @@ pub trait WasmNodeConfigExt { &self, ) -> Result, JsError>; - fn set_bridge_bootnode(&mut self, bridge_addr: String); + fn set_bridge_bootnode(&mut self /* , bridge_addr: String */); } impl WasmNodeConfigExt for WasmNodeConfig { @@ -88,6 +87,7 @@ impl WasmNodeConfigExt for WasmNodeConfig { // Process bootnodes let mut p2p_bootnodes = Vec::with_capacity(self.bootnodes.len()); + for addr in &self.bootnodes { console::log_1(&format!("🚀 Adding bootnode: {}", addr).into()); let addr = addr @@ -116,7 +116,13 @@ impl WasmNodeConfigExt for WasmNodeConfig { }) } - fn set_bridge_bootnode(&mut self, bridge_addr: String) { - self.bootnodes = vec![bridge_addr]; + fn set_bridge_bootnode(&mut self /* , bridge_addr: String */) { + self.bootnodes = vec![ + "/dnsaddr/da-bridge-mocha-4.celestia-mocha.com/p2p/12D3KooWCBAbQbJSpCpCGKzqz3rAN4ixYbc63K68zJg9aisuAajg".to_string(), + "/dnsaddr/da-bridge-mocha-4-2.celestia-mocha.com/p2p/12D3KooWK6wJkScGQniymdWtBwBuU36n6BRXp9rCDDUD6P5gJr3G".to_string(), + "/dnsaddr/da-full-1-mocha-4.celestia-mocha.com/p2p/12D3KooWCUHPLqQXZzpTx1x3TAsdn3vYmTNDhzg66yG8hqoxGGN8".to_string(), + "/dnsaddr/da-full-2-mocha-4.celestia-mocha.com/p2p/12D3KooWR6SHsXPkkvhCRn6vp1RqSefgaT1X1nMNvrVjU2o3GoYy".to_string(), + ]; + /* self.bootnodes = vec![bridge_addr]; */ } } diff --git a/crates/node_types/lightclient/wasm/src/client.rs b/crates/node_types/lightclient/wasm/src/client.rs index 26f9b47c..563bd7b7 100644 --- a/crates/node_types/lightclient/wasm/src/client.rs +++ b/crates/node_types/lightclient/wasm/src/client.rs @@ -16,14 +16,18 @@ pub struct WasmLightClient { impl WasmLightClient { #[wasm_bindgen(constructor)] pub async fn new(port: MessagePort) -> Result { - let worker = WorkerClient::new(port)?; - Ok(Self { worker }) + Ok(Self { + worker: WorkerClient::new(port)?, + }) } #[wasm_bindgen(js_name = verifyEpoch)] pub async fn verify_epoch(&self, height: u64) -> Result<(), JsError> { - let command = LightClientCommand::VerifyEpoch { height }; - match self.worker.exec(command).await? { + match self + .worker + .exec(LightClientCommand::VerifyEpoch { height }) + .await? + { WorkerResponse::EpochVerified => Ok(()), WorkerResponse::Error(e) => Err(JsError::new(&e)), _ => Err(JsError::new("Unexpected response")), diff --git a/crates/node_types/lightclient/wasm/src/worker.rs b/crates/node_types/lightclient/wasm/src/worker.rs index eb4b787a..abbb909b 100644 --- a/crates/node_types/lightclient/wasm/src/worker.rs +++ b/crates/node_types/lightclient/wasm/src/worker.rs @@ -17,15 +17,10 @@ pub struct LightClientWorker { impl LightClientWorker { #[wasm_bindgen(constructor)] pub async fn new(port: MessagePort) -> Result { - console::log_1(&"• Initializing LightClientWorker ✔".into()); - let mut server = WorkerServer::new(); - server.initialize(port)?; - - let celestia = WasmCelestiaClient::new(CelestiaConfig::default()).await?; - - console::log_1(&"• Server registered ✔".into()); - - Ok(Self { server, celestia }) + Ok(Self { + server: WorkerServer::new(port)?, + celestia: WasmCelestiaClient::new(CelestiaConfig::default()).await?, + }) } pub async fn run(&mut self) -> Result<(), JsError> { diff --git a/crates/node_types/lightclient/wasm/src/worker_communication.rs b/crates/node_types/lightclient/wasm/src/worker_communication.rs index db701998..0d1c34d6 100644 --- a/crates/node_types/lightclient/wasm/src/worker_communication.rs +++ b/crates/node_types/lightclient/wasm/src/worker_communication.rs @@ -5,51 +5,8 @@ use web_sys::{console, MessageEvent, MessagePort}; use crate::commands::{LightClientCommand, WorkerResponse}; -struct ClientConnection { - port: MessagePort, - onmessage: Closure, -} - -impl ClientConnection { - fn new( - port: MessagePort, - server_tx: mpsc::UnboundedSender, - ) -> Result { - // We need the Closure because it's how we handle incoming messages from the MessagePort. It's basically our event handler. - let onmessage = Closure::new(move |message_event: MessageEvent| { - match from_value(message_event.data()) { - Ok(command) => { - if let Err(e) = server_tx.send(command) { - web_sys::console::error_1(&format!("Failed to send command: {}", e).into()); - } - } - Err(e) => { - web_sys::console::error_1( - &format!("Failed to deserialize message: {}", e).into(), - ); - } - } - }); - - port.set_onmessage(Some(onmessage.as_ref().unchecked_ref())); - - Ok(ClientConnection { port, onmessage }) - } - - fn send(&self, message: &WorkerResponse) -> Result<(), JsError> { - let message_value = to_value(message)?; - self.port - .post_message(&message_value) - .map_err(|e| JsError::new(&format!("Failed to post message: {:?}", e)))?; - Ok(()) - } -} - -impl Drop for ClientConnection { - fn drop(&mut self) { - self.port.set_onmessage(None); - } -} +// WorkerClient: Sends commands and receives responses in the main thread +// WorkerServer: Receives commands and sends responses in the worker thread pub struct WorkerClient { port: MessagePort, @@ -62,8 +19,15 @@ impl WorkerClient { let (response_tx, response_rx) = mpsc::unbounded_channel(); let onmessage = Closure::new(move |message_event: MessageEvent| { - if let Ok(response) = from_value(message_event.data()) { - response_tx.send(Ok(response)); + match from_value(message_event.data()) { + Ok(response) => { + if let Err(e) = response_tx.send(Ok(response)) { + console::error_1(&format!("Failed to forward response: {}", e).into()); + } + } + Err(e) => { + console::error_1(&format!("Failed to deserialize response: {}", e).into()); + } } }); @@ -79,10 +43,9 @@ impl WorkerClient { pub async fn exec(&self, command: LightClientCommand) -> Result { let mut response_channel = self.response_channel.lock().await; console::log_2(&"🩺 executing".into(), &to_value(&command)?); - let command_value = to_value(&command)?; self.port - .post_message(&command_value) + .post_message(&to_value(&command)?) .map_err(|e| JsError::new(&format!("Failed to post message: {:?}", e)))?; console::log_1(&"📨 message posted".into()); @@ -95,54 +58,56 @@ impl WorkerClient { } } -impl Drop for WorkerClient { - fn drop(&mut self) { - self.port.set_onmessage(None); - } -} - // Doesn't need Mutex because it's designed to process one command at a time sequentially pub struct WorkerServer { - connection: Option, - client_tx: mpsc::UnboundedSender, - client_rx: mpsc::UnboundedReceiver, -} - -impl Default for WorkerServer { - fn default() -> Self { - let (client_tx, client_rx) = mpsc::unbounded_channel(); - - WorkerServer { - connection: None, - client_tx, - client_rx, - } - } + port: MessagePort, + command_rx: mpsc::UnboundedReceiver, + onmessage: Closure, } impl WorkerServer { - pub fn new() -> Self { - console::log_1(&"👷🏼‍♂️ WorkerServer created ✔️".into()); + pub fn new(port: MessagePort) -> Result { + let (command_tx, command_rx) = mpsc::unbounded_channel(); - Self::default() - } + let onmessage = Closure::new(move |message_event: MessageEvent| { + match from_value(message_event.data()) { + Ok(command) => { + if let Err(e) = command_tx.send(command) { + console::error_1(&format!("Failed to process command: {}", e).into()); + } + } + Err(e) => { + console::error_1(&format!("Failed to deserialize command: {}", e).into()); + } + } + }); - pub fn initialize(&mut self, port: MessagePort) -> Result<(), JsError> { - self.connection = Some(ClientConnection::new(port, self.client_tx.clone())?); - Ok(()) + port.set_onmessage(Some(onmessage.as_ref().unchecked_ref())); + console::log_1(&"✅ WorkerServer initialized".into()); + + Ok(WorkerServer { + port, + command_rx, + onmessage, + }) } pub async fn recv(&mut self) -> Result { - self.client_rx + self.command_rx .recv() .await .ok_or_else(|| JsError::new("Channel closed")) } pub fn respond(&self, response: WorkerResponse) { - if let Some(connection) = &self.connection { - if let Err(e) = connection.send(&response) { - web_sys::console::error_1(&format!("Failed to send response: {:?}", &e).into()); + match to_value(&response) { + Ok(response_value) => { + if let Err(e) = self.port.post_message(&response_value) { + console::error_1(&format!("Failed to send response: {:?}", e).into()); + } + } + Err(e) => { + console::error_1(&format!("Failed to serialize response: {:?}", e).into()); } } } diff --git a/crates/tests/src/lib.rs b/crates/tests/src/lib.rs index 4c0dca8b..41bc5bba 100644 --- a/crates/tests/src/lib.rs +++ b/crates/tests/src/lib.rs @@ -32,16 +32,17 @@ async fn test_light_client_prover_talking() -> Result<()> { pretty_env_logger::init(); let bridge_cfg = CelestiaConfig { - connection_string: "ws://0.0.0.0:36658".to_string(), + connection_string: "ws://0.0.0.0:26658".to_string(), + start_height: 3093707, ..CelestiaConfig::default() }; - let lc_cfg = CelestiaConfig { + /* let lc_cfg = CelestiaConfig { connection_string: "ws://0.0.0.0:26658".to_string(), ..CelestiaConfig::default() - }; + }; */ let bridge_da_layer = Arc::new(CelestiaConnection::new(&bridge_cfg, None).await.unwrap()); - let lc_da_layer = Arc::new(CelestiaConnection::new(&lc_cfg, None).await.unwrap()); + /* let lc_da_layer = Arc::new(CelestiaConnection::new(&lc_cfg, None).await.unwrap()); */ let db = setup_db(); let signing_key = create_signing_key(); let pubkey = signing_key.verification_key(); @@ -50,6 +51,7 @@ async fn test_light_client_prover_talking() -> Result<()> { let _service = test_state.register_service("test_service".to_string()); let prover_cfg = prism_prover::Config { key: signing_key, + start_height: 3093707, ..prism_prover::Config::default() }; @@ -59,20 +61,20 @@ async fn test_light_client_prover_talking() -> Result<()> { &prover_cfg, )?); - let lightclient = Arc::new(LightClient::new(lc_da_layer.clone(), lc_cfg, Some(pubkey))); - + /* let lightclient = Arc::new(LightClient::new(lc_da_layer.clone(), lc_cfg, Some(pubkey))); + */ let prover_clone = prover.clone(); spawn(async move { debug!("starting prover"); prover_clone.run().await.unwrap(); }); - let lc_clone = lightclient.clone(); - spawn(async move { - debug!("starting light client"); - lc_clone.run().await.unwrap(); - }); - + /* let lc_clone = lightclient.clone(); + spawn(async move { + debug!("starting light client"); + lc_clone.run().await.unwrap(); + }); + */ spawn(async move { let mut rng = StdRng::from_entropy(); @@ -123,7 +125,7 @@ async fn test_light_client_prover_talking() -> Result<()> { } }); - let mut rx = lc_da_layer.clone().subscribe_to_heights(); + let mut rx = bridge_da_layer.clone().subscribe_to_heights(); let initial_height = rx.recv().await.unwrap(); while let Ok(height) = rx.recv().await { debug!("received height {}", height);