Skip to content

Commit

Permalink
Add Context::get_stream_no_info
Browse files Browse the repository at this point in the history
This methods allows for getting the handler for `Stream` without calling
the server info API.
It can be useful when user wants to perform single operation on many
streams.

Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Sep 2, 2024
1 parent b2fb1b4 commit 67dcd95
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 46 deletions.
47 changes: 46 additions & 1 deletion async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,10 @@ impl Context {
/// # Ok(())
/// # }
/// ```
pub async fn create_stream<S>(&self, stream_config: S) -> Result<Stream, CreateStreamError>
pub async fn create_stream<S>(
&self,
stream_config: S,
) -> Result<Stream<Info>, CreateStreamError>
where
Config: From<S>,
{
Expand Down Expand Up @@ -307,10 +310,50 @@ impl Context {
Response::Ok(info) => Ok(Stream {
context: self.clone(),
info,
name: config.name,
}),
}
}

/// Checks for [Stream] existence on the server and returns handle to it.
/// That handle can be used to manage and use [Consumer].
/// This variant does not fetch [Stream] info from the server.
/// It means it does not check if the stream actually exists.
/// If you run more operations on few streams, it is better to use [Context::get_stream] instead.
/// If you however run single operations on many streams, this method is more efficient.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("localhost:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
///
/// let stream = jetstream.get_stream("events").await?;
/// # Ok(())
/// # }
/// ```
pub async fn get_stream_no_info<T: AsRef<str>>(
&self,
stream: T,
) -> Result<Stream<()>, GetStreamError> {
let stream = stream.as_ref();
if stream.is_empty() {
return Err(GetStreamError::new(GetStreamErrorKind::EmptyName));
}

if !is_valid_name(stream) {
return Err(GetStreamError::new(GetStreamErrorKind::InvalidStreamName));
}

Ok(Stream {
context: self.clone(),
info: (),
name: stream.to_string(),
})
}

/// Checks for [Stream] existence on the server and returns handle to it.
/// That handle can be used to manage and use [Consumer].
///
Expand Down Expand Up @@ -348,6 +391,7 @@ impl Context {
Response::Ok(info) => Ok(Stream {
context: self.clone(),
info,
name: stream.to_string(),
}),
}
}
Expand Down Expand Up @@ -404,6 +448,7 @@ impl Context {
Response::Ok(info) => Ok(Stream {
context: self.clone(),
info,
name: config.name,
}),
}
}
Expand Down
86 changes: 42 additions & 44 deletions async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,13 @@ pub type DeleteMessageError = Error<DeleteMessageErrorKind>;

/// Handle to operations that can be performed on a `Stream`.
#[derive(Debug, Clone)]
pub struct Stream {
pub(crate) info: Info,
pub struct Stream<T = Info> {
pub(crate) info: T,
pub(crate) context: Context,
pub(crate) name: String,
}

impl Stream {
impl Stream<Info> {
/// Retrieves `info` about [Stream] from the server, updates the cached `info` inside
/// [Stream] and returns it.
///
Expand Down Expand Up @@ -175,7 +176,9 @@ impl Stream {
pub fn cached_info(&self) -> &Info {
&self.info
}
}

impl<I> Stream<I> {
/// Gets next message for a [Stream].
///
/// Requires a [Stream] with `allow_direct` set to `true`.
Expand Down Expand Up @@ -218,10 +221,7 @@ impl Stream {
if !is_valid_subject(&subject) {
return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
}
let request_subject = format!(
"{}.DIRECT.GET.{}",
&self.context.prefix, &self.info.config.name
);
let request_subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
let payload;
if let Some(sequence) = sequence {
payload = json!({
Expand All @@ -246,6 +246,7 @@ impl Stream {
message,
context: self.context.clone(),
})?;

if let Some(status) = response.status {
if let Some(ref description) = response.description {
match status {
Expand Down Expand Up @@ -306,10 +307,7 @@ impl Stream {
if !is_valid_subject(&subject) {
return Err(DirectGetError::new(DirectGetErrorKind::InvalidSubject));
}
let request_subject = format!(
"{}.DIRECT.GET.{}",
&self.context.prefix, &self.info.config.name
);
let request_subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
let payload = json!({
"next_by_subj": subject.as_ref(),
});
Expand Down Expand Up @@ -380,10 +378,7 @@ impl Stream {
/// # }
/// ```
pub async fn direct_get(&self, sequence: u64) -> Result<StreamMessage, DirectGetError> {
let subject = format!(
"{}.DIRECT.GET.{}",
&self.context.prefix, &self.info.config.name
);
let subject = format!("{}.DIRECT.GET.{}", &self.context.prefix, &self.name);
let payload = json!({
"seq": sequence,
});
Expand Down Expand Up @@ -454,7 +449,7 @@ impl Stream {
let subject = format!(
"{}.DIRECT.GET.{}.{}",
&self.context.prefix,
&self.info.config.name,
&self.name,
subject.as_ref()
);

Expand Down Expand Up @@ -508,7 +503,7 @@ impl Stream {
/// # }
/// ```
pub async fn get_raw_message(&self, sequence: u64) -> Result<StreamMessage, RawMessageError> {
let subject = format!("STREAM.MSG.GET.{}", &self.info.config.name);
let subject = format!("STREAM.MSG.GET.{}", &self.name);
let payload = json!({
"seq": sequence,
});
Expand Down Expand Up @@ -567,7 +562,7 @@ impl Stream {
&self,
stream_subject: &str,
) -> Result<StreamMessage, LastRawMessageError> {
let subject = format!("STREAM.MSG.GET.{}", &self.info.config.name);
let subject = format!("STREAM.MSG.GET.{}", &self.name);
let payload = json!({
"last_by_subj": stream_subject,
});
Expand Down Expand Up @@ -619,7 +614,7 @@ impl Stream {
/// # }
/// ```
pub async fn delete_message(&self, sequence: u64) -> Result<bool, DeleteMessageError> {
let subject = format!("STREAM.MSG.DELETE.{}", &self.info.config.name);
let subject = format!("STREAM.MSG.DELETE.{}", &self.name);
let payload = json!({
"seq": sequence,
});
Expand Down Expand Up @@ -717,7 +712,7 @@ impl Stream {
config: C,
) -> Result<Consumer<C>, ConsumerError> {
self.context
.create_consumer_on_stream(config, self.info.config.name.clone())
.create_consumer_on_stream(config, self.name.clone())
.await
}

Expand Down Expand Up @@ -750,7 +745,7 @@ impl Stream {
config: C,
) -> Result<Consumer<C>, ConsumerUpdateError> {
self.context
.update_consumer_on_stream(config, self.info.config.name.clone())
.update_consumer_on_stream(config, self.name.clone())
.await
}

Expand Down Expand Up @@ -784,7 +779,7 @@ impl Stream {
config: C,
) -> Result<Consumer<C>, ConsumerCreateStrictError> {
self.context
.create_consumer_strict_on_stream(config, self.info.config.name.clone())
.create_consumer_strict_on_stream(config, self.name.clone())
.await
}

Expand All @@ -810,7 +805,7 @@ impl Stream {
) -> Result<consumer::Info, crate::Error> {
let name = name.as_ref();

let subject = format!("CONSUMER.INFO.{}.{}", self.info.config.name, name);
let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);

match self.context.request(subject, &json!({})).await? {
Response::Ok(info) => Ok(info),
Expand Down Expand Up @@ -884,7 +879,7 @@ impl Stream {
name: &str,
config: T,
) -> Result<Consumer<T>, ConsumerError> {
let subject = format!("CONSUMER.INFO.{}.{}", self.info.config.name, name);
let subject = format!("CONSUMER.INFO.{}.{}", self.name, name);

match self.context.request(subject, &json!({})).await? {
Response::Err { error } if error.code() == 404 => self.create_consumer(config).await,
Expand Down Expand Up @@ -920,7 +915,7 @@ impl Stream {
/// # }
/// ```
pub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, ConsumerError> {
let subject = format!("CONSUMER.DELETE.{}.{}", self.info.config.name, name);
let subject = format!("CONSUMER.DELETE.{}.{}", self.name, name);

match self.context.request(subject, &json!({})).await? {
Response::Ok(delete_status) => Ok(delete_status),
Expand Down Expand Up @@ -949,7 +944,7 @@ impl Stream {
pub fn consumer_names(&self) -> ConsumerNames {
ConsumerNames {
context: self.context.clone(),
stream: self.info.config.name.clone(),
stream: self.name.clone(),
offset: 0,
page_request: None,
consumers: Vec::new(),
Expand Down Expand Up @@ -978,7 +973,7 @@ impl Stream {
pub fn consumers(&self) -> Consumers {
Consumers {
context: self.context.clone(),
stream: self.info.config.name.clone(),
stream: self.name.clone(),
offset: 0,
page_request: None,
consumers: Vec::new(),
Expand Down Expand Up @@ -1570,49 +1565,52 @@ impl ToAssign for Yes {}
impl ToAssign for No {}

#[derive(Debug)]
pub struct Purge<'a, SEQUENCE, KEEP>
pub struct Purge<SEQUENCE, KEEP>
where
SEQUENCE: ToAssign,
KEEP: ToAssign,
{
stream: &'a Stream,
inner: PurgeRequest,
sequence_set: PhantomData<SEQUENCE>,
keep_set: PhantomData<KEEP>,
context: Context,
stream_name: String,
}

impl<'a, SEQUENCE, KEEP> Purge<'a, SEQUENCE, KEEP>
impl<'a, SEQUENCE, KEEP> Purge<SEQUENCE, KEEP>
where
SEQUENCE: ToAssign,
KEEP: ToAssign,
{
/// Adds subject filter to [PurgeRequest]
pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<'a, SEQUENCE, KEEP> {
pub fn filter<T: Into<String>>(mut self, filter: T) -> Purge<SEQUENCE, KEEP> {
self.inner.filter = Some(filter.into());
self
}
}

impl<'a> Purge<'a, No, No> {
pub(crate) fn build(stream: &'a Stream) -> Purge<'a, No, No> {
impl<'a> Purge<No, No> {
pub(crate) fn build<I>(stream: &'a Stream<I>) -> Purge<No, No> {
Purge {
stream,
context: stream.context.clone(),
stream_name: stream.name.clone(),
inner: Default::default(),
sequence_set: PhantomData {},
keep_set: PhantomData {},
}
}
}

impl<'a, KEEP> Purge<'a, No, KEEP>
impl<'a, KEEP> Purge<No, KEEP>
where
KEEP: ToAssign,
{
/// Creates a new [PurgeRequest].
/// `keep` and `sequence` are exclusive, enforced compile time by generics.
pub fn keep(self, keep: u64) -> Purge<'a, No, Yes> {
pub fn keep(self, keep: u64) -> Purge<No, Yes> {
Purge {
stream: self.stream,
context: self.context.clone(),
stream_name: self.stream_name.clone(),
sequence_set: PhantomData {},
keep_set: PhantomData {},
inner: PurgeRequest {
Expand All @@ -1622,15 +1620,16 @@ where
}
}
}
impl<'a, SEQUENCE> Purge<'a, SEQUENCE, No>
impl<'a, SEQUENCE> Purge<SEQUENCE, No>
where
SEQUENCE: ToAssign,
{
/// Creates a new [PurgeRequest].
/// `keep` and `sequence` are exclusive, enforces compile time by generics.
pub fn sequence(self, sequence: u64) -> Purge<'a, Yes, No> {
pub fn sequence(self, sequence: u64) -> Purge<Yes, No> {
Purge {
stream: self.stream,
context: self.context.clone(),
stream_name: self.stream_name.clone(),
sequence_set: PhantomData {},
keep_set: PhantomData {},
inner: PurgeRequest {
Expand Down Expand Up @@ -1660,20 +1659,19 @@ impl Display for PurgeErrorKind {

pub type PurgeError = Error<PurgeErrorKind>;

impl<'a, S, K> IntoFuture for Purge<'a, S, K>
impl<S, K> IntoFuture for Purge<S, K>
where
S: ToAssign + std::marker::Send,
K: ToAssign + std::marker::Send,
{
type Output = Result<PurgeResponse, PurgeError>;

type IntoFuture = BoxFuture<'a, Result<PurgeResponse, PurgeError>>;
type IntoFuture = BoxFuture<'static, Result<PurgeResponse, PurgeError>>;

fn into_future(self) -> Self::IntoFuture {
Box::pin(std::future::IntoFuture::into_future(async move {
let request_subject = format!("STREAM.PURGE.{}", self.stream.info.config.name);
let request_subject = format!("STREAM.PURGE.{}", self.stream_name);
let response: Response<PurgeResponse> = self
.stream
.context
.request(request_subject, &self.inner)
.map_err(|err| match err.kind() {
Expand Down
17 changes: 16 additions & 1 deletion async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ mod jetstream {
use async_nats::jetstream::context::{GetStreamByNameErrorKind, Publish, PublishErrorKind};
use async_nats::jetstream::response::Response;
use async_nats::jetstream::stream::{
self, ConsumerCreateStrictErrorKind, ConsumerUpdateErrorKind, DiscardPolicy, StorageType,
self, ConsumerCreateStrictErrorKind, ConsumerUpdateErrorKind, DirectGetErrorKind,
DiscardPolicy, StorageType,
};
#[cfg(feature = "server_2_10")]
use async_nats::jetstream::stream::{Compression, ConsumerLimits, Source, SubjectTransform};
Expand Down Expand Up @@ -862,6 +863,20 @@ mod jetstream {
stream.direct_get(22).await.expect_err("should error");
}

#[tokio::test]
async fn direct_get_no_stream() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let context = async_nats::jetstream::new(client);

let stream = context.get_stream_no_info("NO_STREAM").await.unwrap();

assert_eq!(
stream.direct_get(1).await.unwrap_err().kind(),
DirectGetErrorKind::TimedOut
);
}

#[tokio::test]
async fn delete_message() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down

0 comments on commit 67dcd95

Please sign in to comment.