From e4b7dfc65f608bb6d43c95d0bddc407f44ce9ed7 Mon Sep 17 00:00:00 2001 From: Ryan Daum Date: Thu, 21 Sep 2023 17:13:54 -0400 Subject: [PATCH] Add a bunch of metrics to the RPC server --- crates/daemon/src/rpc_server.rs | 81 +++++++++++++++++++++------------ 1 file changed, 51 insertions(+), 30 deletions(-) diff --git a/crates/daemon/src/rpc_server.rs b/crates/daemon/src/rpc_server.rs index d8c27483..6ca58ad0 100644 --- a/crates/daemon/src/rpc_server.rs +++ b/crates/daemon/src/rpc_server.rs @@ -8,6 +8,7 @@ use anyhow::{bail, Context, Error}; use async_trait::async_trait; use futures_util::SinkExt; use itertools::Itertools; +use metrics_macros::increment_counter; use tmq::publish::Publish; use tmq::{publish, reply, Multipart}; use tokio::sync::{Mutex, RwLock}; @@ -72,14 +73,17 @@ impl RpcServer { client_id: Uuid, request: RpcRequest, ) -> Multipart { + increment_counter!("rpc_server.process_request"); match request { RpcRequest::ConnectionEstablish(hostname) => { + increment_counter!("rpc_server.connection_establish"); make_response( self.connection_establish(client_id, hostname.as_str()) .await, ) } RpcRequest::RequestSysProp(object, property) => { + increment_counter!("rpc_server.request_sys_prop"); { let client_connections = self.client_connections.read().await; let Some(_) = client_connections.get(&client_id) else { @@ -89,6 +93,7 @@ impl RpcServer { make_response(self.clone().request_sys_prop(object, property).await) } RpcRequest::LoginCommand(args) => { + increment_counter!("rpc_server.login_command"); let connection = { let client_connections = self.client_connections.read().await; let Some(connection) = client_connections.get(&client_id) else { @@ -104,6 +109,7 @@ impl RpcServer { ) } RpcRequest::Command(command) => { + increment_counter!("rpc_server.command"); let connection = { let client_connections = self.client_connections.read().await; let Some(connection) = client_connections.get(&client_id) else { @@ -118,6 +124,7 @@ impl RpcServer { ) } RpcRequest::Eval(evalstr) => { + increment_counter!("rpc_server.eval"); let connection = { let client_connections = self.client_connections.read().await; let Some(connection) = client_connections.get(&client_id) else { @@ -129,6 +136,7 @@ impl RpcServer { make_response(self.clone().eval(client_id, connection, evalstr).await) } RpcRequest::Detach => { + increment_counter!("rpc_server.detach"); // Detach this client id from the player/connection object. let Ok(_) = self.remove_client_connection(client_id).await else { return make_response(Err(RpcError::InternalError( @@ -147,6 +155,7 @@ impl RpcServer { connection: Objid, ) -> Result, Error> { debug!(?client_id, ?connection, "Started session",); + increment_counter!("rpc_server.new_session"); Ok(Arc::new(RpcSession::new( client_id, self.clone(), @@ -366,12 +375,9 @@ impl RpcServer { connection: Objid, args: Vec, ) -> Result { - info!("Performing login for client: {}", client_id); - let Ok(session) = self - .clone() - .new_session(client_id, connection) - .await - else { + debug!("Performing login for client: {}", client_id); + let Ok(session) = self.clone().new_session(client_id, connection).await else { + increment_counter!("rpc_server.perform_login.create_session_failed"); return Err(RpcError::CreateSessionFailed); }; let task_id = match self @@ -390,6 +396,7 @@ impl RpcServer { Ok(t) => t, Err(e) => { error!(error = ?e, "Error submitting login task"); + increment_counter!("rpc_server.perform_login.submit_login_task_failed"); return Err(RpcError::InternalError(e.to_string())); } }; @@ -397,6 +404,7 @@ impl RpcServer { Ok(r) => r, Err(e) => { error!(error = ?e, "Error subscribing to login task"); + increment_counter!("rpc_server.perform_login.subscribe_login_task_failed"); return Err(RpcError::LoginTaskFailed); } }; @@ -415,10 +423,12 @@ impl RpcServer { } Ok(TaskWaiterResult::Error(e)) => { error!(error = ?e, "Error waiting for login results"); + increment_counter!("rpc_server.perform_login.login_task_failed"); return Err(RpcError::LoginTaskFailed); } Err(e) => { error!(error = ?e, "Error waiting for login results"); + increment_counter!("rpc_server.perform_login.login_task_failed"); return Err(RpcError::InternalError(e.to_string())); } }; @@ -430,6 +440,7 @@ impl RpcServer { "Transitioning connection record to logged in" ); let Ok(_) = self.update_client_connection(connection, player).await else { + increment_counter!("rpc_server.perform_login.update_client_connection_failed"); return Err(RpcError::InternalError( "Unable to update client connection".to_string(), )); @@ -444,9 +455,12 @@ impl RpcServer { .await { error!(error = ?e, "Error submitting user_connected task"); - return Err(RpcError::InternalError(e.to_string())); + increment_counter!("rpc_server.perform_login.submit_connected_task_failed"); + // Note we still continue to return a successful login result here, hoping for the best + // but we do log the error. } + increment_counter!("rpc_server.perform_login.success"); Ok(LoginResult(Some((ConnectType::Connected, player)))) } @@ -487,11 +501,8 @@ impl RpcServer { connection: Objid, command: String, ) -> Result { - let Ok(session) = self - .clone() - .new_session(client_id, connection) - .await - else { + let Ok(session) = self.clone().new_session(client_id, connection).await else { + increment_counter!("rpc_server.perform_command.create_session_failed"); return Err(RpcError::CreateSessionFailed); }; @@ -504,21 +515,16 @@ impl RpcServer { .await { Ok(t) => t, - Err(SchedulerError::CouldNotParseCommand(err)) => { - debug!( - command, - ?client_id, - ?connection, - ?err, - "Could not parse command" - ); + Err(SchedulerError::CouldNotParseCommand(_)) => { + increment_counter!("rpc_server.perform_command.could_not_parse_command"); return Err(RpcError::CouldNotParseCommand); } - Err(SchedulerError::NoCommandMatch(s, m)) => { - debug!(command, ?client_id, ?connection, ?s, ?m, "No command match"); + Err(SchedulerError::NoCommandMatch(s, _)) => { + increment_counter!("rpc_server.perform_command.no_command_match"); return Err(RpcError::NoCommandMatch(s)); } Err(e) => { + increment_counter!("rpc_server.perform_command.submit_command_task_failed"); error!(error = ?e, "Error submitting command task"); return Err(RpcError::InternalError(e.to_string())); } @@ -528,6 +534,7 @@ impl RpcServer { let receiver = match self.clone().scheduler.subscribe_to_task(task_id).await { Ok(r) => r, Err(e) => { + increment_counter!("rpc_server.perform_command.subscribe_command_task_failed"); error!(error = ?e, "Error subscribing to command task"); return Err(RpcError::InternalError(e.to_string())); } @@ -535,26 +542,33 @@ impl RpcServer { match receiver.await { Ok(TaskWaiterResult::Success(_)) => { + increment_counter!("rpc_server.perform_command.success"); Ok(RpcResponse::CommandComplete) } Ok(TaskWaiterResult::Error(SchedulerError::PermissionDenied)) => { + increment_counter!("rpc_server.perform_command.permission_denied"); Err(RpcError::PermissionDenied) } Ok(TaskWaiterResult::Error(SchedulerError::CouldNotParseCommand(_))) => { + increment_counter!("rpc_server.perform_command.could_not_parse_command"); Err(RpcError::CouldNotParseCommand) } Ok(TaskWaiterResult::Error(SchedulerError::NoCommandMatch(s, _))) => { + increment_counter!("rpc_server.perform_command.no_command_match"); Err(RpcError::NoCommandMatch(s)) } Ok(TaskWaiterResult::Error(SchedulerError::DatabaseError(e))) => { + increment_counter!("rpc_server.perform_command.database_error"); Err(RpcError::DatabaseError(e)) } Ok(TaskWaiterResult::Error(e)) => { warn!(error = ?e, "Error processing command"); + increment_counter!("rpc_server.perform_command.error"); Err(RpcError::InternalError(e.to_string())) } Err(e) => { warn!(error = ?e, "Error processing command"); + increment_counter!("rpc_server.perform_command.error"); Err(RpcError::InternalError(e.to_string())) } } @@ -566,11 +580,8 @@ impl RpcServer { connection: Objid, expression: String, ) -> Result { - let Ok(session) = self - .clone() - .new_session(client_id, connection) - .await - else { + let Ok(session) = self.clone().new_session(client_id, connection).await else { + increment_counter!("rpc_server.eval.create_session_failed"); return Err(RpcError::CreateSessionFailed); }; @@ -582,6 +593,7 @@ impl RpcServer { { Ok(t) => t, Err(e) => { + increment_counter!("rpc_server.eval.submit_eval_task_failed"); error!(error = ?e, "Error submitting eval task"); return Err(RpcError::InternalError(e.to_string())); } @@ -590,24 +602,26 @@ impl RpcServer { let receiver = match self.clone().scheduler.subscribe_to_task(task_id).await { Ok(r) => r, Err(e) => { + increment_counter!("rpc_server.eval.subscribe_eval_task_failed"); error!(error = ?e, "Error subscribing to command task"); return Err(RpcError::InternalError(e.to_string())); } }; match receiver.await { - Ok(TaskWaiterResult::Success(v)) => { - Ok(RpcResponse::EvalResult(v)) - } + Ok(TaskWaiterResult::Success(v)) => Ok(RpcResponse::EvalResult(v)), Ok(TaskWaiterResult::Error(SchedulerError::DatabaseError(e))) => { + increment_counter!("rpc_server.eval.database_error"); Err(RpcError::DatabaseError(e)) } Ok(TaskWaiterResult::Error(e)) => { error!(error = ?e, "Error processing eval"); + increment_counter!("rpc_server.eval.task_error"); Err(RpcError::InternalError(e.to_string())) } Err(e) => { error!(error = ?e, "Error processing eval"); + increment_counter!("rpc_server.eval.internal_error"); Err(RpcError::InternalError(e.to_string())) } } @@ -617,6 +631,7 @@ impl RpcServer { &self, events: &[(Objid, NarrativeEvent)], ) -> Result<(), Error> { + increment_counter!("rpc_server.publish_narrative_events"); let mut publish = self.publish.lock().await; let connections_client = self.connections_client.read().await; for (player, event) in events { @@ -647,6 +662,7 @@ impl RpcServer { player: Objid, message: String, ) -> Result<(), Error> { + increment_counter!("rpc_server.send_system_message"); let mut publish = self.publish.lock().await; let event = ConnectionEvent::SystemMessage(player, message); let event_bytes = bincode::encode_to_vec(&event, bincode::config::standard())?; @@ -682,11 +698,13 @@ pub(crate) async fn zmq_loop( return Ok(()); } Ok((mut request, reply)) => { + increment_counter!("rpc_server.request"); trace!(num_parts = request.len(), "ZQM Request received"); // Components are: if request.len() != 2 { error!("Invalid request received, ignoring"); + increment_counter!("rpc_server.invalid_request"); rpc_socket = reply .send(make_response(Err(RpcError::InvalidRequest))) .await?; @@ -696,6 +714,7 @@ pub(crate) async fn zmq_loop( let (Some(client_id), Some(request_body)) = (request.pop_front(), request.pop_front()) else { + increment_counter!("rpc_server.invalid_request"); rpc_socket = reply .send(make_response(Err(RpcError::InvalidRequest))) .await?; @@ -703,6 +722,7 @@ pub(crate) async fn zmq_loop( }; let Ok(client_id) = Uuid::from_slice(&client_id) else { + increment_counter!("rpc_server.invalid_request"); rpc_socket = reply .send(make_response(Err(RpcError::InvalidRequest))) .await?; @@ -725,6 +745,7 @@ pub(crate) async fn zmq_loop( // on the type. let response = rpc_server.clone().process_request(client_id, request).await; rpc_socket = reply.send(response).await?; + increment_counter!("rpc_server.processed_requests"); } } }