diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 9d957bd2c..5ac41cae2 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -33,6 +33,8 @@ use crate::handlers::{PREFIX_META, PREFIX_TAGS, SEPARATOR, STREAM_NAME_HEADER_KE use crate::metadata::STREAM_INFO; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; +use super::logstream::error::CreateStreamError; + // Handler for POST /api/v1/ingest // ingests events by extracting stream name from header // creates if stream does not exist @@ -43,9 +45,7 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result Result<(), PostError> { + if STREAM_INFO.stream_exists(stream_name) { + return Ok(()); + } + super::logstream::create_stream(stream_name.to_string()).await?; + Ok(()) +} + #[derive(Debug, thiserror::Error)] pub enum PostError { #[error("Stream {0} not found")] @@ -116,8 +125,8 @@ pub enum PostError { Event(#[from] EventError), #[error("Invalid Request: {0}")] Invalid(#[from] anyhow::Error), - #[error("Failed to create stream due to {0}")] - CreateStream(Box), + #[error("{0}")] + CreateStream(#[from] CreateStreamError), } impl actix_web::ResponseError for PostError { @@ -127,6 +136,9 @@ impl actix_web::ResponseError for PostError { PostError::Header(_) => StatusCode::BAD_REQUEST, PostError::Event(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::Invalid(_) => StatusCode::BAD_REQUEST, + PostError::CreateStream(CreateStreamError::StreamNameValidation(_)) => { + StatusCode::BAD_REQUEST + } PostError::CreateStream(_) => StatusCode::INTERNAL_SERVER_ERROR, PostError::StreamNotFound(_) => StatusCode::NOT_FOUND, } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index f3c9c93d3..5fa5fbb43 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -31,18 +31,16 @@ use crate::storage::{LogStream, StorageDir}; use crate::{event, stats}; use crate::{metadata, validator}; -use self::error::StreamError; +use self::error::{CreateStreamError, StreamError}; pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - validator::stream_name(&stream_name)?; - - let objectstore = CONFIG.storage().get_object_store(); if !metadata::STREAM_INFO.stream_exists(&stream_name) { return Err(StreamError::StreamNotFound(stream_name)); } + let objectstore = CONFIG.storage().get_object_store(); objectstore.delete_stream(&stream_name).await?; metadata::STREAM_INFO.delete_stream(&stream_name); event::STREAM_WRITERS.delete_stream(&stream_name); @@ -269,27 +267,14 @@ fn remove_id_from_alerts(value: &mut Value) { } } -// Check if the stream exists and create a new stream if doesn't exist -pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), StreamError> { - if metadata::STREAM_INFO.stream_exists(stream_name) { - return Ok(()); - } - - create_stream(stream_name.to_string()).await -} - -pub async fn create_stream(stream_name: String) -> Result<(), StreamError> { +pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name validator::stream_name(&stream_name)?; // Proceed to create log stream if it doesn't exist let storage = CONFIG.storage().get_object_store(); - if let Err(e) = storage.create_stream(&stream_name).await { - // Fail if unable to create log stream on object store backend - return Err(StreamError::Custom { - msg: format!("failed to create log stream {stream_name} due to err: {e}"), - status: StatusCode::INTERNAL_SERVER_ERROR, - }); + if let Err(err) = storage.create_stream(&stream_name).await { + return Err(CreateStreamError::Storage { stream_name, err }); } metadata::STREAM_INFO.add_stream(stream_name.to_string()); @@ -308,9 +293,20 @@ pub mod error { }; #[derive(Debug, thiserror::Error)] - pub enum StreamError { + pub enum CreateStreamError { #[error("Stream name validation failed due to {0}")] StreamNameValidation(#[from] StreamNameValidationError), + #[error("failed to create log stream {stream_name} due to err: {err}")] + Storage { + stream_name: String, + err: ObjectStorageError, + }, + } + + #[derive(Debug, thiserror::Error)] + pub enum StreamError { + #[error("{0}")] + CreateStream(#[from] CreateStreamError), #[error("Log stream {0} does not exist")] StreamNotFound(String), #[error("Log stream is not initialized, send an event to this logstream and try again")] @@ -341,7 +337,12 @@ pub mod error { impl actix_web::ResponseError for StreamError { fn status_code(&self) -> http::StatusCode { match self { - StreamError::StreamNameValidation(_) => StatusCode::BAD_REQUEST, + StreamError::CreateStream(CreateStreamError::StreamNameValidation(_)) => { + StatusCode::BAD_REQUEST + } + StreamError::CreateStream(CreateStreamError::Storage { .. }) => { + StatusCode::INTERNAL_SERVER_ERROR + } StreamError::StreamNotFound(_) => StatusCode::NOT_FOUND, StreamError::Custom { status, .. } => *status, StreamError::UninitializedLogstream => StatusCode::METHOD_NOT_ALLOWED,