Skip to content

Commit

Permalink
first little cleanup, mocha sampling works now, lets do a rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
sebasti810 committed Nov 8, 2024
1 parent 55a8919 commit dfe5986
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 145 deletions.
21 changes: 0 additions & 21 deletions crates/node_types/lightclient/wasm/js/src/App.tsx
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import { useState } from 'react'
import init, { LightClientWorker, WasmLightClient } from "../../pkg/prism_wasm_lightclient.js";
/* import OperationSubmitter from './components/OperationSubmitter.js'; */

function App() {
const [status, setStatus] = useState('Not started')
const [height, setHeight] = useState(0)
const [client, setClient] = useState<WasmLightClient | null>(null)

const startLightClient = async () => {
try {
Expand All @@ -17,7 +15,6 @@ function App() {
worker.run()

const client = await new WasmLightClient(channel.port2)
setClient(client);
setStatus('Running')

channel.port2.onmessage = (event) => {
Expand All @@ -36,15 +33,6 @@ function App() {
}
}

/* const verifyRandomEpoch = async () => {
try {
setStatus('Verifying epoch...')
setStatus('Epoch verified successfully')
} catch (error) {
setStatus(`Verification failed: ${error}`)
}
} */

return (
<div className="max-w-2xl mx-auto p-6">
<h1 className="text-2xl font-bold mb-6">Light Client Test</h1>
Expand All @@ -67,16 +55,7 @@ function App() {
>
Start Light Client
</button>

{/* <button
onClick={verifyRandomEpoch}
disabled={!client}
className="bg-green-500 text-white px-4 py-2 rounded hover:bg-green-600 disabled:bg-gray-400"
>
Verify Random Epoch
</button> */}
</div>
{/* client && <OperationSubmitter /> */}
</div>
)
}
Expand Down
2 changes: 0 additions & 2 deletions crates/node_types/lightclient/wasm/js/src/index.css
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ a:hover {
body {
margin: 0;
display: flex;
place-items: center;
min-width: 320px;
min-height: 100vh;
}

Expand Down
13 changes: 4 additions & 9 deletions crates/node_types/lightclient/wasm/src/celestia/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,12 @@ pub struct WasmCelestiaClient {

impl WasmCelestiaClient {
pub async fn new(config: CelestiaConfig) -> Result<Self, JsError> {
let bridge_addr = CelestiaConfig::fetch_bridge_webtransport_multiaddr().await;

console::log_2(
&"🚀 Bridge address:".into(),
&bridge_addr.to_string().into(),
);
/* let bridge_addr = CelestiaConfig::fetch_bridge_webtransport_multiaddr().await; */

let current_height = Arc::new(AtomicU64::new(config.start_height));

let mut wasm_node_config = WasmNodeConfig::default(Network::Private);
wasm_node_config.set_bridge_bootnode(bridge_addr.to_string());
let mut wasm_node_config = WasmNodeConfig::default(Network::Mocha);
wasm_node_config.set_bridge_bootnode(/* bridge_addr.to_string() */);
let node_config = wasm_node_config.initialize_node_config().await?;

let (node, mut event_subscriber) = Node::new_subscribed(node_config).await?;
Expand Down Expand Up @@ -154,7 +149,7 @@ impl WasmCelestiaClient {
.request_all_blobs(
&header,
Namespace::new_v0(&namespace).unwrap(),
Some(Duration::from_secs(5)),
Some(Duration::from_secs(7)),
)
.await
{
Expand Down
18 changes: 12 additions & 6 deletions crates/node_types/lightclient/wasm/src/celestia/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
use celestia_rpc::{Client, P2PClient};
use libp2p::{identity::Keypair, multiaddr::Protocol, Multiaddr};
use lumina_node::{
blockstore::IndexedDbBlockstore, events::NodeEvent, network::network_id, store::IndexedDbStore,
NodeConfig,
blockstore::IndexedDbBlockstore, network::network_id, store::IndexedDbStore, NodeConfig,
};
use lumina_node_wasm::{
client::WasmNodeConfig,
Expand All @@ -27,7 +26,7 @@ impl Default for CelestiaConfig {
fn default() -> Self {
Self {
node_url: "ws://localhost:26658".to_string(),
start_height: 0,
start_height: 3093687,
snark_namespace_id: "00000000000000de1008".to_string(),
operation_namespace_id: Some("00000000000000de1009".to_string()),
}
Expand Down Expand Up @@ -66,7 +65,7 @@ pub trait WasmNodeConfigExt {
&self,
) -> Result<NodeConfig<IndexedDbBlockstore, IndexedDbStore>, JsError>;

fn set_bridge_bootnode(&mut self, bridge_addr: String);
fn set_bridge_bootnode(&mut self /* , bridge_addr: String */);
}

impl WasmNodeConfigExt for WasmNodeConfig {
Expand All @@ -88,6 +87,7 @@ impl WasmNodeConfigExt for WasmNodeConfig {

// Process bootnodes
let mut p2p_bootnodes = Vec::with_capacity(self.bootnodes.len());

for addr in &self.bootnodes {
console::log_1(&format!("🚀 Adding bootnode: {}", addr).into());
let addr = addr
Expand Down Expand Up @@ -116,7 +116,13 @@ impl WasmNodeConfigExt for WasmNodeConfig {
})
}

fn set_bridge_bootnode(&mut self, bridge_addr: String) {
self.bootnodes = vec![bridge_addr];
fn set_bridge_bootnode(&mut self /* , bridge_addr: String */) {
self.bootnodes = vec![
"/dnsaddr/da-bridge-mocha-4.celestia-mocha.com/p2p/12D3KooWCBAbQbJSpCpCGKzqz3rAN4ixYbc63K68zJg9aisuAajg".to_string(),
"/dnsaddr/da-bridge-mocha-4-2.celestia-mocha.com/p2p/12D3KooWK6wJkScGQniymdWtBwBuU36n6BRXp9rCDDUD6P5gJr3G".to_string(),
"/dnsaddr/da-full-1-mocha-4.celestia-mocha.com/p2p/12D3KooWCUHPLqQXZzpTx1x3TAsdn3vYmTNDhzg66yG8hqoxGGN8".to_string(),
"/dnsaddr/da-full-2-mocha-4.celestia-mocha.com/p2p/12D3KooWR6SHsXPkkvhCRn6vp1RqSefgaT1X1nMNvrVjU2o3GoYy".to_string(),
];
/* self.bootnodes = vec![bridge_addr]; */
}
}
12 changes: 8 additions & 4 deletions crates/node_types/lightclient/wasm/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@ pub struct WasmLightClient {
impl WasmLightClient {
#[wasm_bindgen(constructor)]
pub async fn new(port: MessagePort) -> Result<WasmLightClient, JsError> {
let worker = WorkerClient::new(port)?;
Ok(Self { worker })
Ok(Self {
worker: WorkerClient::new(port)?,
})
}

#[wasm_bindgen(js_name = verifyEpoch)]
pub async fn verify_epoch(&self, height: u64) -> Result<(), JsError> {
let command = LightClientCommand::VerifyEpoch { height };
match self.worker.exec(command).await? {
match self
.worker
.exec(LightClientCommand::VerifyEpoch { height })
.await?
{
WorkerResponse::EpochVerified => Ok(()),
WorkerResponse::Error(e) => Err(JsError::new(&e)),
_ => Err(JsError::new("Unexpected response")),
Expand Down
13 changes: 4 additions & 9 deletions crates/node_types/lightclient/wasm/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,10 @@ pub struct LightClientWorker {
impl LightClientWorker {
#[wasm_bindgen(constructor)]
pub async fn new(port: MessagePort) -> Result<LightClientWorker, JsError> {
console::log_1(&"• Initializing LightClientWorker ✔".into());
let mut server = WorkerServer::new();
server.initialize(port)?;

let celestia = WasmCelestiaClient::new(CelestiaConfig::default()).await?;

console::log_1(&"• Server registered ✔".into());

Ok(Self { server, celestia })
Ok(Self {
server: WorkerServer::new(port)?,
celestia: WasmCelestiaClient::new(CelestiaConfig::default()).await?,
})
}

pub async fn run(&mut self) -> Result<(), JsError> {
Expand Down
127 changes: 46 additions & 81 deletions crates/node_types/lightclient/wasm/src/worker_communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,8 @@ use web_sys::{console, MessageEvent, MessagePort};

use crate::commands::{LightClientCommand, WorkerResponse};

struct ClientConnection {
port: MessagePort,
onmessage: Closure<dyn Fn(MessageEvent)>,
}

impl ClientConnection {
fn new(
port: MessagePort,
server_tx: mpsc::UnboundedSender<LightClientCommand>,
) -> Result<Self, JsError> {
// We need the Closure because it's how we handle incoming messages from the MessagePort. It's basically our event handler.
let onmessage = Closure::new(move |message_event: MessageEvent| {
match from_value(message_event.data()) {
Ok(command) => {
if let Err(e) = server_tx.send(command) {
web_sys::console::error_1(&format!("Failed to send command: {}", e).into());
}
}
Err(e) => {
web_sys::console::error_1(
&format!("Failed to deserialize message: {}", e).into(),
);
}
}
});

port.set_onmessage(Some(onmessage.as_ref().unchecked_ref()));

Ok(ClientConnection { port, onmessage })
}

fn send(&self, message: &WorkerResponse) -> Result<(), JsError> {
let message_value = to_value(message)?;
self.port
.post_message(&message_value)
.map_err(|e| JsError::new(&format!("Failed to post message: {:?}", e)))?;
Ok(())
}
}

impl Drop for ClientConnection {
fn drop(&mut self) {
self.port.set_onmessage(None);
}
}
// WorkerClient: Sends commands and receives responses in the main thread
// WorkerServer: Receives commands and sends responses in the worker thread

pub struct WorkerClient {
port: MessagePort,
Expand All @@ -62,8 +19,15 @@ impl WorkerClient {
let (response_tx, response_rx) = mpsc::unbounded_channel();

let onmessage = Closure::new(move |message_event: MessageEvent| {
if let Ok(response) = from_value(message_event.data()) {
response_tx.send(Ok(response));
match from_value(message_event.data()) {
Ok(response) => {
if let Err(e) = response_tx.send(Ok(response)) {
console::error_1(&format!("Failed to forward response: {}", e).into());
}
}
Err(e) => {
console::error_1(&format!("Failed to deserialize response: {}", e).into());
}
}
});

Expand All @@ -79,10 +43,9 @@ impl WorkerClient {
pub async fn exec(&self, command: LightClientCommand) -> Result<WorkerResponse, JsError> {
let mut response_channel = self.response_channel.lock().await;
console::log_2(&"🩺 executing".into(), &to_value(&command)?);
let command_value = to_value(&command)?;

self.port
.post_message(&command_value)
.post_message(&to_value(&command)?)
.map_err(|e| JsError::new(&format!("Failed to post message: {:?}", e)))?;

console::log_1(&"📨 message posted".into());
Expand All @@ -95,54 +58,56 @@ impl WorkerClient {
}
}

impl Drop for WorkerClient {
fn drop(&mut self) {
self.port.set_onmessage(None);
}
}

// Doesn't need Mutex because it's designed to process one command at a time sequentially
pub struct WorkerServer {
connection: Option<ClientConnection>,
client_tx: mpsc::UnboundedSender<LightClientCommand>,
client_rx: mpsc::UnboundedReceiver<LightClientCommand>,
}

impl Default for WorkerServer {
fn default() -> Self {
let (client_tx, client_rx) = mpsc::unbounded_channel();

WorkerServer {
connection: None,
client_tx,
client_rx,
}
}
port: MessagePort,
command_rx: mpsc::UnboundedReceiver<LightClientCommand>,
onmessage: Closure<dyn Fn(MessageEvent)>,
}

impl WorkerServer {
pub fn new() -> Self {
console::log_1(&"👷🏼‍♂️ WorkerServer created ✔️".into());
pub fn new(port: MessagePort) -> Result<Self, JsError> {
let (command_tx, command_rx) = mpsc::unbounded_channel();

Self::default()
}
let onmessage = Closure::new(move |message_event: MessageEvent| {
match from_value(message_event.data()) {
Ok(command) => {
if let Err(e) = command_tx.send(command) {
console::error_1(&format!("Failed to process command: {}", e).into());
}
}
Err(e) => {
console::error_1(&format!("Failed to deserialize command: {}", e).into());
}
}
});

pub fn initialize(&mut self, port: MessagePort) -> Result<(), JsError> {
self.connection = Some(ClientConnection::new(port, self.client_tx.clone())?);
Ok(())
port.set_onmessage(Some(onmessage.as_ref().unchecked_ref()));
console::log_1(&"✅ WorkerServer initialized".into());

Ok(WorkerServer {
port,
command_rx,
onmessage,
})
}

pub async fn recv(&mut self) -> Result<LightClientCommand, JsError> {
self.client_rx
self.command_rx
.recv()
.await
.ok_or_else(|| JsError::new("Channel closed"))
}

pub fn respond(&self, response: WorkerResponse) {
if let Some(connection) = &self.connection {
if let Err(e) = connection.send(&response) {
web_sys::console::error_1(&format!("Failed to send response: {:?}", &e).into());
match to_value(&response) {
Ok(response_value) => {
if let Err(e) = self.port.post_message(&response_value) {
console::error_1(&format!("Failed to send response: {:?}", e).into());
}
}
Err(e) => {
console::error_1(&format!("Failed to serialize response: {:?}", e).into());
}
}
}
Expand Down
Loading

0 comments on commit dfe5986

Please sign in to comment.