Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix service Info to include endpoints info #1119

Merged
merged 1 commit into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions async-nats/src/service/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,15 @@ pub struct Stats {
/// Queue group to which this endpoint is assigned to.
pub queue_group: String,
}

#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Eq)]
pub struct Info {
/// Name of the endpoint.
pub name: String,
/// Endpoint subject.
pub subject: String,
/// Queue group to which this endpoint is assigned.
pub queue_group: String,
/// Endpoint-specific metadata.
pub metadata: HashMap<String, String>,
}
28 changes: 19 additions & 9 deletions async-nats/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ pub struct Info {
pub description: Option<String>,
/// Service version.
pub version: String,
/// All service endpoints.
pub subjects: Vec<String>,
/// Additional metadata
pub metadata: HashMap<String, String>,
/// Info about all service endpoints.
pub endpoints: Vec<endpoint::Info>,
}

/// Configuration of the [Service].
Expand Down Expand Up @@ -322,6 +322,10 @@ impl Service {
"service name is not a valid string (only A-Z, a-z, 0-9, _, - are allowed)",
)));
}
let endpoints_state = Arc::new(Mutex::new(Endpoints {
endpoints: HashMap::new(),
}));

let queue_group = config
.queue_group
.unwrap_or(DEFAULT_QUEUE_GROUP.to_string());
Expand All @@ -334,15 +338,12 @@ impl Service {
id: id.clone(),
description: config.description.clone(),
version: config.version.clone(),
subjects: Vec::default(),
metadata: config.metadata.clone().unwrap_or_default(),
endpoints: Vec::new(),
};

let (shutdown_tx, _) = tokio::sync::broadcast::channel(1);

let endpoints = HashMap::new();
let endpoints_state = Arc::new(Mutex::new(Endpoints { endpoints }));

// create subscriptions for all verbs.
let mut pings =
verb_subscription(client.clone(), Verb::Ping, config.name.clone(), id.clone()).await?;
Expand All @@ -355,7 +356,6 @@ impl Service {
let handle = tokio::task::spawn({
let mut stats_callback = config.stats_handler;
let info = info.clone();
let subjects = subjects.clone();
let endpoints_state = endpoints_state.clone();
let client = client.clone();
async move {
Expand All @@ -371,10 +371,20 @@ impl Service {
client.publish(ping.reply.unwrap(), pong.into()).await?;
},
Some(info_request) = infos.next() => {
let subjects = subjects.clone();
let info = info.clone();

let endpoints: Vec<endpoint::Info> = {
endpoints_state.lock().unwrap().endpoints.values().map(|value| {
endpoint::Info {
name: value.name.to_owned(),
subject: value.subject.to_owned(),
queue_group: value.queue_group.to_owned(),
metadata: value.metadata.to_owned()
}
}).collect()
};
let info = Info {
subjects: subjects.lock().unwrap().to_vec(),
endpoints,
..info
};
let info_json = serde_json::to_vec(&info).map(Bytes::from)?;
Expand Down
39 changes: 39 additions & 0 deletions async-nats/tests/service_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,45 @@ mod service {
assert_eq!(responses.take(2).count().await, 2);
}

#[tokio::test]
async fn info() {
let server = nats_server::run_basic_server();
let client = async_nats::connect(server.client_url()).await.unwrap();

let service = client
.service_builder()
.start("service", "1.0.0")
.await
.unwrap();

let endpoint_info = service::endpoint::Info {
name: "endpoint_1".to_string(),
subject: "subject".to_string(),
queue_group: "queue".to_string(),
metadata: HashMap::from([("key".to_string(), "value".to_string())]),
};

service
.endpoint_builder()
.name(&endpoint_info.name)
.metadata(endpoint_info.metadata.clone())
.queue_group(&endpoint_info.queue_group)
.add(&endpoint_info.subject)
.await
.unwrap();

let info: service::Info = serde_json::from_slice(
&client
.request("$SRV.INFO".into(), "".into())
.await
.unwrap()
.payload,
)
.unwrap();

assert_eq!(&endpoint_info, info.endpoints.first().unwrap());
}

#[tokio::test]
#[cfg(not(target_os = "windows"))]
async fn cross_clients_tests() {
Expand Down
Loading