Skip to content

Commit

Permalink
Add a bunch of metrics to the RPC server
Browse files Browse the repository at this point in the history
  • Loading branch information
rdaum committed Sep 21, 2023
1 parent 8931f73 commit e4b7dfc
Showing 1 changed file with 51 additions and 30 deletions.
81 changes: 51 additions & 30 deletions crates/daemon/src/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use anyhow::{bail, Context, Error};
use async_trait::async_trait;
use futures_util::SinkExt;
use itertools::Itertools;
use metrics_macros::increment_counter;
use tmq::publish::Publish;
use tmq::{publish, reply, Multipart};
use tokio::sync::{Mutex, RwLock};
Expand Down Expand Up @@ -72,14 +73,17 @@ impl RpcServer {
client_id: Uuid,
request: RpcRequest,
) -> Multipart {
increment_counter!("rpc_server.process_request");
match request {
RpcRequest::ConnectionEstablish(hostname) => {
increment_counter!("rpc_server.connection_establish");
make_response(
self.connection_establish(client_id, hostname.as_str())
.await,
)
}
RpcRequest::RequestSysProp(object, property) => {
increment_counter!("rpc_server.request_sys_prop");
{
let client_connections = self.client_connections.read().await;
let Some(_) = client_connections.get(&client_id) else {
Expand All @@ -89,6 +93,7 @@ impl RpcServer {
make_response(self.clone().request_sys_prop(object, property).await)
}
RpcRequest::LoginCommand(args) => {
increment_counter!("rpc_server.login_command");
let connection = {
let client_connections = self.client_connections.read().await;
let Some(connection) = client_connections.get(&client_id) else {
Expand All @@ -104,6 +109,7 @@ impl RpcServer {
)
}
RpcRequest::Command(command) => {
increment_counter!("rpc_server.command");
let connection = {
let client_connections = self.client_connections.read().await;
let Some(connection) = client_connections.get(&client_id) else {
Expand All @@ -118,6 +124,7 @@ impl RpcServer {
)
}
RpcRequest::Eval(evalstr) => {
increment_counter!("rpc_server.eval");
let connection = {
let client_connections = self.client_connections.read().await;
let Some(connection) = client_connections.get(&client_id) else {
Expand All @@ -129,6 +136,7 @@ impl RpcServer {
make_response(self.clone().eval(client_id, connection, evalstr).await)
}
RpcRequest::Detach => {
increment_counter!("rpc_server.detach");
// Detach this client id from the player/connection object.
let Ok(_) = self.remove_client_connection(client_id).await else {
return make_response(Err(RpcError::InternalError(
Expand All @@ -147,6 +155,7 @@ impl RpcServer {
connection: Objid,
) -> Result<Arc<dyn Session>, Error> {
debug!(?client_id, ?connection, "Started session",);
increment_counter!("rpc_server.new_session");
Ok(Arc::new(RpcSession::new(
client_id,
self.clone(),
Expand Down Expand Up @@ -366,12 +375,9 @@ impl RpcServer {
connection: Objid,
args: Vec<String>,
) -> Result<RpcResponse, RpcError> {
info!("Performing login for client: {}", client_id);
let Ok(session) = self
.clone()
.new_session(client_id, connection)
.await
else {
debug!("Performing login for client: {}", client_id);
let Ok(session) = self.clone().new_session(client_id, connection).await else {
increment_counter!("rpc_server.perform_login.create_session_failed");
return Err(RpcError::CreateSessionFailed);
};
let task_id = match self
Expand All @@ -390,13 +396,15 @@ impl RpcServer {
Ok(t) => t,
Err(e) => {
error!(error = ?e, "Error submitting login task");
increment_counter!("rpc_server.perform_login.submit_login_task_failed");
return Err(RpcError::InternalError(e.to_string()));
}
};
let receiver = match self.clone().scheduler.subscribe_to_task(task_id).await {
Ok(r) => r,
Err(e) => {
error!(error = ?e, "Error subscribing to login task");
increment_counter!("rpc_server.perform_login.subscribe_login_task_failed");
return Err(RpcError::LoginTaskFailed);
}
};
Expand All @@ -415,10 +423,12 @@ impl RpcServer {
}
Ok(TaskWaiterResult::Error(e)) => {
error!(error = ?e, "Error waiting for login results");
increment_counter!("rpc_server.perform_login.login_task_failed");
return Err(RpcError::LoginTaskFailed);
}
Err(e) => {
error!(error = ?e, "Error waiting for login results");
increment_counter!("rpc_server.perform_login.login_task_failed");
return Err(RpcError::InternalError(e.to_string()));
}
};
Expand All @@ -430,6 +440,7 @@ impl RpcServer {
"Transitioning connection record to logged in"
);
let Ok(_) = self.update_client_connection(connection, player).await else {
increment_counter!("rpc_server.perform_login.update_client_connection_failed");
return Err(RpcError::InternalError(
"Unable to update client connection".to_string(),
));
Expand All @@ -444,9 +455,12 @@ impl RpcServer {
.await
{
error!(error = ?e, "Error submitting user_connected task");
return Err(RpcError::InternalError(e.to_string()));
increment_counter!("rpc_server.perform_login.submit_connected_task_failed");
// Note we still continue to return a successful login result here, hoping for the best
// but we do log the error.
}

increment_counter!("rpc_server.perform_login.success");
Ok(LoginResult(Some((ConnectType::Connected, player))))
}

Expand Down Expand Up @@ -487,11 +501,8 @@ impl RpcServer {
connection: Objid,
command: String,
) -> Result<RpcResponse, RpcError> {
let Ok(session) = self
.clone()
.new_session(client_id, connection)
.await
else {
let Ok(session) = self.clone().new_session(client_id, connection).await else {
increment_counter!("rpc_server.perform_command.create_session_failed");
return Err(RpcError::CreateSessionFailed);
};

Expand All @@ -504,21 +515,16 @@ impl RpcServer {
.await
{
Ok(t) => t,
Err(SchedulerError::CouldNotParseCommand(err)) => {
debug!(
command,
?client_id,
?connection,
?err,
"Could not parse command"
);
Err(SchedulerError::CouldNotParseCommand(_)) => {
increment_counter!("rpc_server.perform_command.could_not_parse_command");
return Err(RpcError::CouldNotParseCommand);
}
Err(SchedulerError::NoCommandMatch(s, m)) => {
debug!(command, ?client_id, ?connection, ?s, ?m, "No command match");
Err(SchedulerError::NoCommandMatch(s, _)) => {
increment_counter!("rpc_server.perform_command.no_command_match");
return Err(RpcError::NoCommandMatch(s));
}
Err(e) => {
increment_counter!("rpc_server.perform_command.submit_command_task_failed");
error!(error = ?e, "Error submitting command task");
return Err(RpcError::InternalError(e.to_string()));
}
Expand All @@ -528,33 +534,41 @@ impl RpcServer {
let receiver = match self.clone().scheduler.subscribe_to_task(task_id).await {
Ok(r) => r,
Err(e) => {
increment_counter!("rpc_server.perform_command.subscribe_command_task_failed");
error!(error = ?e, "Error subscribing to command task");
return Err(RpcError::InternalError(e.to_string()));
}
};

match receiver.await {
Ok(TaskWaiterResult::Success(_)) => {
increment_counter!("rpc_server.perform_command.success");
Ok(RpcResponse::CommandComplete)
}
Ok(TaskWaiterResult::Error(SchedulerError::PermissionDenied)) => {
increment_counter!("rpc_server.perform_command.permission_denied");
Err(RpcError::PermissionDenied)
}
Ok(TaskWaiterResult::Error(SchedulerError::CouldNotParseCommand(_))) => {
increment_counter!("rpc_server.perform_command.could_not_parse_command");
Err(RpcError::CouldNotParseCommand)
}
Ok(TaskWaiterResult::Error(SchedulerError::NoCommandMatch(s, _))) => {
increment_counter!("rpc_server.perform_command.no_command_match");
Err(RpcError::NoCommandMatch(s))
}
Ok(TaskWaiterResult::Error(SchedulerError::DatabaseError(e))) => {
increment_counter!("rpc_server.perform_command.database_error");
Err(RpcError::DatabaseError(e))
}
Ok(TaskWaiterResult::Error(e)) => {
warn!(error = ?e, "Error processing command");
increment_counter!("rpc_server.perform_command.error");
Err(RpcError::InternalError(e.to_string()))
}
Err(e) => {
warn!(error = ?e, "Error processing command");
increment_counter!("rpc_server.perform_command.error");
Err(RpcError::InternalError(e.to_string()))
}
}
Expand All @@ -566,11 +580,8 @@ impl RpcServer {
connection: Objid,
expression: String,
) -> Result<RpcResponse, RpcError> {
let Ok(session) = self
.clone()
.new_session(client_id, connection)
.await
else {
let Ok(session) = self.clone().new_session(client_id, connection).await else {
increment_counter!("rpc_server.eval.create_session_failed");
return Err(RpcError::CreateSessionFailed);
};

Expand All @@ -582,6 +593,7 @@ impl RpcServer {
{
Ok(t) => t,
Err(e) => {
increment_counter!("rpc_server.eval.submit_eval_task_failed");
error!(error = ?e, "Error submitting eval task");
return Err(RpcError::InternalError(e.to_string()));
}
Expand All @@ -590,24 +602,26 @@ impl RpcServer {
let receiver = match self.clone().scheduler.subscribe_to_task(task_id).await {
Ok(r) => r,
Err(e) => {
increment_counter!("rpc_server.eval.subscribe_eval_task_failed");
error!(error = ?e, "Error subscribing to command task");
return Err(RpcError::InternalError(e.to_string()));
}
};

match receiver.await {
Ok(TaskWaiterResult::Success(v)) => {
Ok(RpcResponse::EvalResult(v))
}
Ok(TaskWaiterResult::Success(v)) => Ok(RpcResponse::EvalResult(v)),
Ok(TaskWaiterResult::Error(SchedulerError::DatabaseError(e))) => {
increment_counter!("rpc_server.eval.database_error");
Err(RpcError::DatabaseError(e))
}
Ok(TaskWaiterResult::Error(e)) => {
error!(error = ?e, "Error processing eval");
increment_counter!("rpc_server.eval.task_error");
Err(RpcError::InternalError(e.to_string()))
}
Err(e) => {
error!(error = ?e, "Error processing eval");
increment_counter!("rpc_server.eval.internal_error");
Err(RpcError::InternalError(e.to_string()))
}
}
Expand All @@ -617,6 +631,7 @@ impl RpcServer {
&self,
events: &[(Objid, NarrativeEvent)],
) -> Result<(), Error> {
increment_counter!("rpc_server.publish_narrative_events");
let mut publish = self.publish.lock().await;
let connections_client = self.connections_client.read().await;
for (player, event) in events {
Expand Down Expand Up @@ -647,6 +662,7 @@ impl RpcServer {
player: Objid,
message: String,
) -> Result<(), Error> {
increment_counter!("rpc_server.send_system_message");
let mut publish = self.publish.lock().await;
let event = ConnectionEvent::SystemMessage(player, message);
let event_bytes = bincode::encode_to_vec(&event, bincode::config::standard())?;
Expand Down Expand Up @@ -682,11 +698,13 @@ pub(crate) async fn zmq_loop(
return Ok(());
}
Ok((mut request, reply)) => {
increment_counter!("rpc_server.request");
trace!(num_parts = request.len(), "ZQM Request received");

// Components are:
if request.len() != 2 {
error!("Invalid request received, ignoring");
increment_counter!("rpc_server.invalid_request");
rpc_socket = reply
.send(make_response(Err(RpcError::InvalidRequest)))
.await?;
Expand All @@ -696,13 +714,15 @@ pub(crate) async fn zmq_loop(
let (Some(client_id), Some(request_body)) =
(request.pop_front(), request.pop_front())
else {
increment_counter!("rpc_server.invalid_request");
rpc_socket = reply
.send(make_response(Err(RpcError::InvalidRequest)))
.await?;
continue;
};

let Ok(client_id) = Uuid::from_slice(&client_id) else {
increment_counter!("rpc_server.invalid_request");
rpc_socket = reply
.send(make_response(Err(RpcError::InvalidRequest)))
.await?;
Expand All @@ -725,6 +745,7 @@ pub(crate) async fn zmq_loop(
// on the type.
let response = rpc_server.clone().process_request(client_id, request).await;
rpc_socket = reply.send(response).await?;
increment_counter!("rpc_server.processed_requests");
}
}
}
Expand Down

0 comments on commit e4b7dfc

Please sign in to comment.