Skip to content

Commit

Permalink
refactor: bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
parmesant committed Dec 24, 2024
1 parent 6db3908 commit 8f04ff5
Show file tree
Hide file tree
Showing 5 changed files with 199 additions and 80 deletions.
59 changes: 49 additions & 10 deletions src/handlers/http/alerts/alerts_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@

use datafusion::{
common::tree_node::TreeNode,
functions_aggregate::{
count::count,
expr_fn::avg,
min_max::{max, min},
sum::sum,
},
prelude::{col, lit, Expr},
};
use tracing::trace;
Expand Down Expand Up @@ -77,8 +83,8 @@ pub async fn user_auth_for_query(session_key: &SessionKey, query: &str) -> Resul
}

/// This function contains the logic to run the alert evaluation task
pub async fn evaluate_alert(alert: AlertConfig) -> Result<(), AlertError> {
println!("RUNNING EVAL TASK FOR- {alert:?}");
pub async fn evaluate_alert(alert: &AlertConfig) -> Result<(), AlertError> {
trace!("RUNNING EVAL TASK FOR- {alert:?}");

let (start_time, end_time) = match &alert.eval_type {
super::EvalConfig::RollingWindow(rolling_window) => {
Expand All @@ -87,13 +93,11 @@ pub async fn evaluate_alert(alert: AlertConfig) -> Result<(), AlertError> {
};

let session_state = QUERY_SESSION.state();
let raw_logical_plan = session_state
.create_logical_plan(&alert.query)
.await
.unwrap();
let raw_logical_plan = session_state.create_logical_plan(&alert.query).await?;

// TODO: Filter tags should be taken care of!!!
let time_range = TimeRange::parse_human_time(start_time, end_time).unwrap();
let time_range = TimeRange::parse_human_time(start_time, end_time)
.map_err(|err| AlertError::CustomError(err.to_string()))?;
let query = crate::query::Query {
raw_logical_plan,
time_range,
Expand All @@ -102,12 +106,28 @@ pub async fn evaluate_alert(alert: AlertConfig) -> Result<(), AlertError> {

// for now proceed in a similar fashion as we do in query
// TODO: in case of multiple table query does the selection of time partition make a difference? (especially when the tables don't have overlapping data)
let stream_name = query.first_table_name().unwrap();
let stream_name = if let Some(stream_name) = query.first_table_name() {
stream_name
} else {
return Err(AlertError::CustomError(format!(
"Table name not found in query- {}",
alert.query
)));
};

let df = query.get_dataframe(stream_name).await.unwrap();
let df = query
.get_dataframe(stream_name)
.await
.map_err(|err| AlertError::CustomError(err.to_string()))?;

// let df = DataFrame::new(session_state, raw_logical_plan);

// for now group by is empty, we can include this later
let group_expr = vec![];

// agg expression
let mut aggr_expr = vec![];

let mut expr = Expr::Literal(datafusion::scalar::ScalarValue::Boolean(Some(true)));
for threshold in &alert.thresholds {
let res = match threshold.operator {
Expand Down Expand Up @@ -137,10 +157,29 @@ pub async fn evaluate_alert(alert: AlertConfig) -> Result<(), AlertError> {
}
};

aggr_expr.push(match threshold.agg {
crate::handlers::http::alerts::Aggregate::Avg => {
avg(col(&threshold.column)).alias(&threshold.column)
}
crate::handlers::http::alerts::Aggregate::Count => {
count(col(&threshold.column)).alias(&threshold.column)
}
crate::handlers::http::alerts::Aggregate::Min => {
min(col(&threshold.column)).alias(&threshold.column)
}
crate::handlers::http::alerts::Aggregate::Max => {
max(col(&threshold.column)).alias(&threshold.column)
}
crate::handlers::http::alerts::Aggregate::Sum => {
sum(col(&threshold.column)).alias(&threshold.column)
}
});
expr = expr.and(res);
}

let nrows = df.clone().filter(expr).unwrap().count().await.unwrap();
let df = df.aggregate(group_expr, aggr_expr)?;

let nrows = df.clone().filter(expr)?.count().await?;
trace!("dataframe-\n{:?}", df.collect().await);

if nrows > 0 {
Expand Down
56 changes: 36 additions & 20 deletions src/handlers/http/alerts/http_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@

use crate::{
option::CONFIG,
storage::object_storage::alert_json_path,
storage::{object_storage::alert_json_path, ALERTS_ROOT_DIRECTORY, PARSEABLE_ROOT_DIRECTORY},
sync::schedule_alert_task,
utils::{actix::extract_session_key_from_req, uid::Uid},
utils::actix::extract_session_key_from_req,
};
use actix_web::{web, HttpRequest, Responder};
use bytes::Bytes;
use tracing::warn;
use relative_path::RelativePathBuf;

use super::{alerts_utils::user_auth_for_query, AlertConfig, AlertError, AlertState, ALERTS};
use super::{
alerts_utils::user_auth_for_query, AlertConfig, AlertError, AlertRequest, AlertState, ALERTS,
};

// GET /alerts
/// User needs at least a read access to the stream(s) that is being referenced in an alert
Expand All @@ -39,10 +41,11 @@ pub async fn list(req: HttpRequest) -> Result<impl Responder, AlertError> {
}

// POST /alerts
pub async fn post(req: HttpRequest, alert: AlertConfig) -> Result<impl Responder, AlertError> {
pub async fn post(req: HttpRequest, alert: AlertRequest) -> Result<impl Responder, AlertError> {
let alert: AlertConfig = alert.into();
// validate the incoming alert query
// does the user have access to these tables or not?
let session_key = extract_session_key_from_req(&req).unwrap();
let session_key = extract_session_key_from_req(&req)?;
user_auth_for_query(&session_key, &alert.query).await?;

// now that we've validated that the user can run this query
Expand Down Expand Up @@ -71,7 +74,10 @@ pub async fn get(req: HttpRequest) -> Result<impl Responder, AlertError> {
.get("alert_id")
.ok_or(AlertError::Metadata("No alert ID Provided"))?;

let alert = ALERTS.get_alert_by_id(session_key, id).await?;
let alert = ALERTS.get_alert_by_id(id).await?;
// validate that the user has access to the tables mentioned
user_auth_for_query(&session_key, &alert.query).await?;

Ok(web::Json(alert))
}

Expand All @@ -95,31 +101,34 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, AlertError> {
// PUT /alerts/{alert_id}
/// first save on disk, then in memory
/// then modify scheduled task
pub async fn modify(
req: HttpRequest,
mut alert: AlertConfig,
) -> Result<impl Responder, AlertError> {
pub async fn modify(req: HttpRequest, alert: AlertRequest) -> Result<impl Responder, AlertError> {
let session_key = extract_session_key_from_req(&req)?;
let alert_id = req
.match_info()
.get("alert_id")
.ok_or(AlertError::Metadata("No alert ID Provided"))?;

// ensure that the user doesn't unknowingly change the ID
if alert_id != alert.id.to_string() {
warn!("Alert modify request is trying to change Alert ID, reverting ID");
alert.id = Uid::from_string(alert_id)
.map_err(|_| AlertError::CustomError("Unable to get Uid from String".to_owned()))?;
}
// check if alert id exists in map
ALERTS.get_alert_by_id(alert_id).await?;

// validate that the user has access to the tables mentioned
user_auth_for_query(&session_key, &alert.query).await?;

// // fetch the alert from this ID to get AlertState
// let state = ALERTS.get_alert_by_id(session_key, alert_id).await?.state;

let store = CONFIG.storage().get_object_store();

// fetch the alert object for the relevant ID
let old_alert_config: AlertConfig = serde_json::from_slice(
&store
.get_object(&RelativePathBuf::from_iter([
PARSEABLE_ROOT_DIRECTORY,
ALERTS_ROOT_DIRECTORY,
&format!("{alert_id}.json"),
]))
.await?,
)?;

let alert = alert.modify(old_alert_config);

// modify on disk
store.put_alert(&alert.id.to_string(), &alert).await?;

Expand All @@ -136,11 +145,18 @@ pub async fn modify(

// PUT /alerts/{alert_id}/update_state
pub async fn update_state(req: HttpRequest, state: String) -> Result<impl Responder, AlertError> {
let session_key = extract_session_key_from_req(&req)?;
let alert_id = req
.match_info()
.get("alert_id")
.ok_or(AlertError::Metadata("No alert ID Provided"))?;

// check if alert id exists in map
let alert = ALERTS.get_alert_by_id(alert_id).await?;

// validate that the user has access to the tables mentioned
user_auth_for_query(&session_key, &alert.query).await?;

// get current state
let current_state = ALERTS.get_state(alert_id).await?;

Expand Down
Loading

0 comments on commit 8f04ff5

Please sign in to comment.