Skip to content

Commit

Permalink
Address review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
td202 committed Nov 2, 2023
1 parent 7348409 commit d1d1b11
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 96 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ num-bigint = "0.4"
num-traits = "0.2"
tokio-postgres = { version = "^0.7.8", features = ["with-serde_json-1"], optional = true }
http = "0.2"
tokio-stream = "0.1"

concordium_base = { version = "3.0.1", path = "./concordium-base/rust-src/concordium_base/", features = ["encryption"] }
concordium-smart-contract-engine = { version = "3.0", path = "./concordium-base/smart-contracts/wasm-chain-integration/", default-features = false, features = ["async"]}
Expand Down
2 changes: 1 addition & 1 deletion examples/v2_dry_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ struct App {
#[structopt(
long = "node",
help = "GRPC interface of the node.",
default_value = "http://localhost:25162"
default_value = "http://localhost:20000"
)]
endpoint: v2::Endpoint,
}
Expand Down
146 changes: 51 additions & 95 deletions src/v2/dry_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,95 +23,51 @@ use concordium_base::{
use futures::*;

mod shared_receiver {
use futures::{stream::Stream, StreamExt};
use tokio::{
sync::{mpsc, oneshot},
task::JoinHandle,
};

use futures::{lock::Mutex, stream::FusedStream, *};
use std::{collections::LinkedList, sync::Arc};

/// A stream together with a queue of pending requests for items from the
/// stream that have not yet been polled. This is used to allow multiple
/// readers of the stream to be sequenced.
struct InnerSharedReceiver<S>
/// A `SharedReceiver` wraps an underlying stream so that multiple clients
/// can queue to receive items from the stream.
pub struct SharedReceiver<S>
where
S: Stream, {
/// The underlying stream.
src: S,
/// The queue of pending receivers.
pending: LinkedList<tokio::sync::oneshot::Sender<Option<S::Item>>>,
senders: mpsc::UnboundedSender<oneshot::Sender<S::Item>>,
task: JoinHandle<()>,
}

/// A pending item to be received from a [`SharedReceiver``].
pub struct SharedReceiverItem<S>
where
S: Stream, {
/// The item, if it has already been read from the stream.
value: tokio::sync::oneshot::Receiver<Option<S::Item>>,
/// The shared receiver.
receiver: Arc<Mutex<InnerSharedReceiver<S>>>,
impl<S: Stream> Drop for SharedReceiver<S> {
fn drop(&mut self) { self.task.abort(); }
}

/// A `SharedReceiver` wraps an underlying stream so that multiple clients
/// can queue to receive items from the queue.
pub struct SharedReceiver<S>
impl<S: Stream + Unpin + Send + 'static> SharedReceiver<S>
where
S: Stream, {
inner: Arc<Mutex<InnerSharedReceiver<S>>>,
}

impl<S: Stream> SharedReceiver<S> {
/// Construct a shared receiver from a stream.
S::Item: Send,
{
/// Construct a new shared receiver. This spawns a background task that
/// pairs waiters with incoming stream items.
pub fn new(stream: S) -> Self {
let inner = InnerSharedReceiver {
src: stream,
pending: LinkedList::new(),
};
SharedReceiver {
inner: Arc::new(Mutex::new(inner)),
}
}

/// Get a [`SharedReceiverItem`] that can be used to receive the next
/// item from the stream. This can be thought of as reserving a
/// place in the queue to receive an item from the stream.
pub async fn next(&self) -> SharedReceiverItem<S> {
let (item_sender, item_receiver) = tokio::sync::oneshot::channel();
self.inner.lock().await.pending.push_back(item_sender);
SharedReceiverItem {
value: item_receiver,
receiver: self.inner.clone(),
}
let (senders, rec_senders) = mpsc::unbounded_channel::<oneshot::Sender<S::Item>>();
let task = tokio::task::spawn(async {
let mut zipped = stream.zip(tokio_stream::wrappers::UnboundedReceiverStream::from(
rec_senders,
));
while let Some((item, sender)) = zipped.next().await {
let _ = sender.send(item);
}
});
SharedReceiver { senders, task }
}
}

impl<S: Stream + Unpin + FusedStream> SharedReceiverItem<S> {
/// Receive an item from the stream. Since the `SharedReceiverItem` is
/// consumed in the process, this can only occur once. Receiving
/// is cooperative in that we receive items from the stream on behalf of
/// other `SharedReceiveItem`s until we have received our own.
pub async fn receive(mut self) -> Option<S::Item> {
use tokio::sync::oneshot::error::TryRecvError::*;
// Check if we have already received our item. If so, we are done.
match self.value.try_recv() {
Ok(v) => return v,
Err(Closed) => return None,
Err(Empty) => {}
}
let mut receiver = self.receiver.lock().await;
loop {
// We check at the start of the loop since it is possible that another thread
// received for us since we acquired the lock.
match self.value.try_recv() {
Ok(v) => return v,
Err(Closed) => return None,
Err(Empty) => {}
}
// Receive the next item from the stream to send to the next waiting receiver.
let val = receiver.src.next().await;
// Since we have not received our value, the pending queue cannot be empty.
let next_item = receiver.pending.pop_front().unwrap();
// We discard the result because we do not care if the receiver has already been
// dropped.
let _ = next_item.send(val);
}
/// Claim the next item from the stream when it becomes available.
/// Returns `None` if the stream is already closed. Otherwise returns a
/// [`oneshot::Receiver`] that can be `await`ed to retrieve the item.
pub fn next(&self) -> Option<oneshot::Receiver<S::Item>> {
let (send, recv) = oneshot::channel();
self.senders.send(send).ok()?;
Some(recv)
}
}
}
Expand Down Expand Up @@ -603,12 +559,7 @@ impl<P: PayloadLike> From<concordium_base::transactions::AccountTransaction<P>>
.signature
.signatures
.into_iter()
.map(|(c, v)| {
v.into_iter()
.map(|(k, _)| (c.clone(), k))
.collect::<Vec<(CredentialIndex, KeyIndex)>>()
})
.flatten()
.flat_map(|(c, v)| std::iter::repeat(c).zip(v.into_keys()))
.collect(),
}
}
Expand Down Expand Up @@ -1266,6 +1217,11 @@ impl DryRun {
/// recommended to close the request stream if the [`DryRun`] object will
/// be retained for any significant length of time after the last request is
/// made.
///
/// Note that dropping the [`DryRun`] object will stop the background task
/// that services in-flight requests, so it should not be dropped before
/// `await`ing any such requests. Closing the request stream does not stop
/// the background task.
pub fn close(&mut self) { self.request_send = None; }

/// Helper function that issues a dry-run request and returns a future for
Expand All @@ -1274,19 +1230,19 @@ impl DryRun {
&mut self,
request: generated::DryRunRequest,
) -> tonic::Result<impl Future<Output = Option<tonic::Result<generated::DryRunResponse>>>> {
let sender = self
.request_send
.as_mut()
.ok_or_else(|| tonic::Status::cancelled("dry run already completed"))?;
match sender.send(request).await {
Ok(_) => Ok(self.response_recv.next().await.receive()),
let lazy_cancelled = || tonic::Status::cancelled("dry run already completed");
let sender = self.request_send.as_mut().ok_or_else(lazy_cancelled)?;
let send_result = sender.send(request).await;
let receive_result = self.response_recv.next().ok_or_else(lazy_cancelled);
match send_result {
Ok(_) => receive_result.map(|r| r.map(|x| x.ok())),
Err(_) => {
// In this case, the server must have closed the stream. We query the
// response stream to see if there is an error indicating the reason.
if let Some(Err(e)) = self.response_recv.next().await.receive().await {
Err(e)?
// In this case, the server must have closed the stream. We query the response
// stream to see if there is an error indicating the reason.
if let Ok(Err(e)) = receive_result?.await {
Err(e)
} else {
Err(tonic::Status::cancelled("dry run already completed"))?
Err(lazy_cancelled())
}
}
}
Expand Down

0 comments on commit d1d1b11

Please sign in to comment.