Skip to content

Commit

Permalink
Update initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreiEres committed Mar 27, 2023
1 parent 1b02175 commit bbeb431
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 56 deletions.
65 changes: 23 additions & 42 deletions src/core/telemetry_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,15 @@
// along with polkadot-introspector. If not, see <http://www.gnu.org/licenses/>.
//

use super::{EventConsumerInit, EventStream, TelemetryFeed, MAX_MSG_QUEUE_SIZE};
use async_trait::async_trait;
use super::TelemetryFeed;
use color_eyre::Report;
use futures::future;
use futures_util::StreamExt;
use log::info;
use tokio::{
net::TcpStream,
sync::{
broadcast::Sender as BroadcastSender,
mpsc::{channel, error::SendError, Sender},
mpsc::{error::SendError, Sender},
},
};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
Expand All @@ -42,45 +40,15 @@ pub struct TelemetrySubscription {
consumers: Vec<Sender<TelemetryEvent>>,
}

#[async_trait]
impl EventStream for TelemetrySubscription {
type Event = TelemetryEvent;

/// Create a new consumer of events. Returns consumer initialization data.
fn create_consumer(&mut self) -> EventConsumerInit<Self::Event> {
let (update_tx, update_rx) = channel(MAX_MSG_QUEUE_SIZE);
self.consumers.push(update_tx);

EventConsumerInit::new(vec![update_rx])
}

async fn run(
self,
tasks: Vec<tokio::task::JoinHandle<()>>,
shutdown_tx: BroadcastSender<()>,
) -> color_eyre::Result<()> {
let mut futures = self
.consumers
.into_iter()
.map(|update_channel| tokio::spawn(Self::run_per_consumer(update_channel, shutdown_tx.clone())))
.collect::<Vec<_>>();

futures.extend(tasks);
future::try_join_all(futures).await?;

Ok(())
}
}

impl TelemetrySubscription {
pub fn new() -> Self {
Self { consumers: Vec::new() }
pub fn new(consumers: Vec<Sender<TelemetryEvent>>) -> Self {
Self { consumers }
}

// Sets up per websocket tasks to handle updates and reconnects on errors.
async fn run_per_consumer(update_channel: Sender<TelemetryEvent>, shutdown_tx: BroadcastSender<()>) {
async fn run_per_consumer(update_channel: Sender<TelemetryEvent>, url: String, shutdown_tx: BroadcastSender<()>) {
let mut shutdown_rx = shutdown_tx.subscribe();
let mut ws_stream = telemetry_stream().await;
let mut ws_stream = telemetry_stream(&url).await;

loop {
tokio::select! {
Expand All @@ -97,8 +65,7 @@ impl TelemetrySubscription {
}

for message in feed.unwrap() {
// TODO: change to info
println!("[telemetry] {:?}", message);
info!("[telemetry] {:?}", message);
if let Err(e) = update_channel.send(TelemetryEvent::NewMessage(message)).await {
return on_consumer_error(e);
}
Expand All @@ -110,10 +77,24 @@ impl TelemetrySubscription {
}
}
}

pub async fn run(
self,
url: String,
shutdown_tx: BroadcastSender<()>,
) -> color_eyre::Result<Vec<tokio::task::JoinHandle<()>>> {
Ok(self
.consumers
.into_iter()
.map(|update_channel| {
tokio::spawn(Self::run_per_consumer(update_channel, url.clone(), shutdown_tx.clone()))
})
.collect::<Vec<_>>())
}
}

async fn telemetry_stream() -> WebSocketStream<MaybeTlsStream<TcpStream>> {
let url = Url::parse("wss://feed.telemetry.polkadot.io/feed/").unwrap();
async fn telemetry_stream(url: &str) -> WebSocketStream<MaybeTlsStream<TcpStream>> {
let url = Url::parse(url).unwrap();
let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");

ws_stream
Expand Down
52 changes: 38 additions & 14 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,24 @@ use clap::{ArgAction, Parser};
use color_eyre::eyre::eyre;
use futures::future;
use log::{error, LevelFilter};
use tokio::{signal, sync::broadcast};
use tokio::{
signal,
sync::broadcast::{self, Sender},
};

use block_time::BlockTimeOptions;
use jaeger::JaegerOptions;
use metadata_checker::{MetadataChecker, MetadataCheckerOptions};
use pc::ParachainCommanderOptions;
use whois::WhoIsOptions;

mod block_time;
mod core;
mod jaeger;
mod kvdb;
mod metadata_checker;
mod pc;
mod whois;

use crate::{core::EventStream, kvdb::KvdbOptions};

Expand All @@ -49,8 +54,8 @@ enum Command {
ParachainCommander(ParachainCommanderOptions),
/// Validate statically generated metadata
MetadataChecker(MetadataCheckerOptions),
/// Test telemetry feed, should be removed soon
Telemetry,
/// Define the
WhoIs(WhoIsOptions),
}

#[derive(Debug, Parser)]
Expand Down Expand Up @@ -139,19 +144,38 @@ async fn main() -> color_eyre::Result<()> {
error!("FATAL: cannot start metadata checker: {}", err)
};
},
Command::Telemetry => {
let mut core = core::TelemetrySubscription::new();
let _consumer_init = core.create_consumer();
let (shutdown_tx, _) = broadcast::channel(1);
let shutdown_tx_cpy = shutdown_tx.clone();
let mut futures = vec![];
futures.push(tokio::spawn(async move {
signal::ctrl_c().await.unwrap();
let _ = shutdown_tx_cpy.send(());
}));
core.run(futures, shutdown_tx.clone()).await?;
Command::WhoIs(opts) => {
let shutdown_tx = init_shutdown();
let futures = init_futures_with_shutdown(
whois::WhoIs::new(opts)?.run(shutdown_tx.clone()).await?,
shutdown_tx.clone(),
);
run(futures).await?
},
}

Ok(())
}

fn init_shutdown() -> Sender<()> {
let (shutdown_tx, _) = broadcast::channel(1);
shutdown_tx
}

fn init_futures_with_shutdown(
mut futures: Vec<tokio::task::JoinHandle<()>>,
shutdown_tx: Sender<()>,
) -> Vec<tokio::task::JoinHandle<()>> {
futures.push(tokio::spawn(on_shutdown(shutdown_tx)));
futures
}

async fn on_shutdown(shutdown_tx: Sender<()>) {
signal::ctrl_c().await.unwrap();
let _ = shutdown_tx.send(());
}

async fn run(futures: Vec<tokio::task::JoinHandle<()>>) -> color_eyre::Result<()> {
future::try_join_all(futures).await?;
Ok(())
}
68 changes: 68 additions & 0 deletions src/whois/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2023 Parity Technologies (UK) Ltd.
// This file is part of polkadot-introspector.
//
// polkadot-introspector is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// polkadot-introspector is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with polkadot-introspector. If not, see <http://www.gnu.org/licenses/>.

use std::time::Duration;

use clap::Parser;
use tokio::sync::{
broadcast::Sender as BroadcastSender,
mpsc::{channel, error::TryRecvError, Receiver},
};

use crate::core::{TelemetryEvent, TelemetrySubscription, MAX_MSG_QUEUE_SIZE};

#[derive(Clone, Debug, Parser)]
#[clap(rename_all = "kebab-case")]
pub(crate) struct WhoIsOptions {
/// Web-Socket URL of a telemetry backend
#[clap(name = "ws", long)]
pub url: String,
}

pub(crate) struct WhoIs {
opts: WhoIsOptions,
subscription: TelemetrySubscription,
update_channel: Receiver<TelemetryEvent>,
}

impl WhoIs {
pub(crate) fn new(opts: WhoIsOptions) -> color_eyre::Result<Self> {
let (update_tx, update_rx) = channel(MAX_MSG_QUEUE_SIZE);
Ok(Self { opts, subscription: TelemetrySubscription::new(vec![update_tx]), update_channel: update_rx })
}

pub(crate) async fn run(
self,
shutdown_tx: BroadcastSender<()>,
) -> color_eyre::Result<Vec<tokio::task::JoinHandle<()>>> {
let mut futures = self.subscription.run(self.opts.url.clone(), shutdown_tx).await?;
futures.push(tokio::spawn(Self::watch(self.update_channel)));

Ok(futures)
}

async fn watch(mut update: Receiver<TelemetryEvent>) {
loop {
match update.try_recv() {
Ok(event) => {
println!("{:?}", event);
},
Err(TryRecvError::Disconnected) => break,
Err(TryRecvError::Empty) => tokio::time::sleep(Duration::from_millis(1000)).await,
}
}
}
}

0 comments on commit bbeb431

Please sign in to comment.