-
Hello again, I was using an older version of the crate and upon updating one of the features I implemented got booted, so I wanted some feedback on how to implement it now and if it could be needed by someone else. I am running processes over SSH, and need to collect stdout. However, I am also interested into the errors that could happend and wished to process that stream as well. All the convenience methods that transform a Before the update, I defined two With the changes made since, the modules making up the ChannelStream seem to be private and cannot be fooled around easily. I then created pub struct Wrap(Receiver<Vec<u8>>, BytesMut);
impl Wrap {
fn new(rx: Receiver<Vec<u8>>) -> Self {
Self {
0: rx,
1: BytesMut::new(),
}
}
}
/// Create streams for a channel's stdout and stderr, consuming the channel in the process
pub fn into_streams<S>(mut chan: Channel<S>) -> (Wrap, Wrap)
where
S: From<(ChannelId, ChannelMsg)> + std::marker::Send + 'static + Sync,
{
let (txo, rxo) = mpsc::channel::<Vec<u8>>(1000);
let (txe, rxe) = mpsc::channel::<Vec<u8>>(1000);
tokio::spawn(async move {
loop {
match chan.wait().await {
Some(ChannelMsg::Data { data }) => {
txo.send(data[..].into())
.await
.map_err(|_| SshError::SendError)?;
}
Some(ChannelMsg::ExtendedData { data, ext }) if ext == 1 => {
txe.send(data[..].into())
.await
.map_err(|_| SshError::SendError)?;
}
Some(ChannelMsg::ExtendedData { data: _, ext }) => {
log::debug!("Received surprise data on stream {ext}");
}
Some(ChannelMsg::Eof) => {
// Send a 0-length chunk to indicate EOF.
txo.send(vec![]).await.map_err(|_| SshError::SendError)?;
break;
}
None => break,
_ => (),
}
}
chan.close().await?;
Ok::<_, SshError>(())
});
(Wrap::new(rxo), Wrap::new(rxe))
}
impl AsyncRead for Wrap {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let cache_size = self.1.len();
buf.put_slice(&self.1.split_to(usize::min(buf.remaining(), cache_size)));
if buf.remaining() > 0 {
match self.0.poll_recv(cx) {
Poll::Ready(Some(msg)) => {
self.1 = BytesMut::from(&msg[..]);
let len = self.1.len();
buf.put_slice(&self.1.split_to(usize::min(buf.remaining(), len)));
Poll::Ready(Ok(()))
}
Poll::Ready(None) => Poll::Ready(Ok(())),
Poll::Pending => Poll::Pending,
}
} else {
Poll::Ready(Ok(()))
}
}
} Please advise if you think there is a better |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 2 replies
-
You're correct, right now there's no built-in way to obtain two |
Beta Was this translation helpful? Give feedback.
You're correct, right now there's no built-in way to obtain two
AsyncRead
for stdout and stderr/extended data as they would have to simultaneously read from the same internal mpscReceiver
. Your solution is conceptually perfectly fine (without reviewing the AsyncRead impl in detail) and is how I'd implement it myself