Skip to content

Commit

Permalink
Merge branch 'main' into oauth
Browse files Browse the repository at this point in the history
  • Loading branch information
nitisht authored Sep 14, 2023
2 parents 7ab724d + dcc83e1 commit 5e613e1
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 27 deletions.
22 changes: 17 additions & 5 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use crate::handlers::{PREFIX_META, PREFIX_TAGS, SEPARATOR, STREAM_NAME_HEADER_KE
use crate::metadata::STREAM_INFO;
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};

use super::logstream::error::CreateStreamError;

// Handler for POST /api/v1/ingest
// ingests events by extracting stream name from header
// creates if stream does not exist
Expand All @@ -43,9 +45,7 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
{
let stream_name = stream_name.to_str().unwrap().to_owned();
if let Err(e) = super::logstream::create_stream_if_not_exists(&stream_name).await {
return Err(PostError::CreateStream(e.into()));
}
create_stream_if_not_exists(&stream_name).await?;
push_logs(stream_name, req, body).await?;
Ok(HttpResponse::Ok().finish())
} else {
Expand Down Expand Up @@ -104,6 +104,15 @@ fn into_event_batch(
Ok((size, rb, is_first))
}

// Check if the stream exists and create a new stream if doesn't exist
pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostError> {
if STREAM_INFO.stream_exists(stream_name) {
return Ok(());
}
super::logstream::create_stream(stream_name.to_string()).await?;
Ok(())
}

#[derive(Debug, thiserror::Error)]
pub enum PostError {
#[error("Stream {0} not found")]
Expand All @@ -116,8 +125,8 @@ pub enum PostError {
Event(#[from] EventError),
#[error("Invalid Request: {0}")]
Invalid(#[from] anyhow::Error),
#[error("Failed to create stream due to {0}")]
CreateStream(Box<dyn std::error::Error + Send + Sync>),
#[error("{0}")]
CreateStream(#[from] CreateStreamError),
}

impl actix_web::ResponseError for PostError {
Expand All @@ -127,6 +136,9 @@ impl actix_web::ResponseError for PostError {
PostError::Header(_) => StatusCode::BAD_REQUEST,
PostError::Event(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::Invalid(_) => StatusCode::BAD_REQUEST,
PostError::CreateStream(CreateStreamError::StreamNameValidation(_)) => {
StatusCode::BAD_REQUEST
}
PostError::CreateStream(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::StreamNotFound(_) => StatusCode::NOT_FOUND,
}
Expand Down
45 changes: 23 additions & 22 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,16 @@ use crate::storage::{LogStream, StorageDir};
use crate::{event, stats};
use crate::{metadata, validator};

use self::error::StreamError;
use self::error::{CreateStreamError, StreamError};

pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
validator::stream_name(&stream_name)?;

let objectstore = CONFIG.storage().get_object_store();

if !metadata::STREAM_INFO.stream_exists(&stream_name) {
return Err(StreamError::StreamNotFound(stream_name));
}

let objectstore = CONFIG.storage().get_object_store();
objectstore.delete_stream(&stream_name).await?;
metadata::STREAM_INFO.delete_stream(&stream_name);
event::STREAM_WRITERS.delete_stream(&stream_name);
Expand Down Expand Up @@ -269,27 +267,14 @@ fn remove_id_from_alerts(value: &mut Value) {
}
}

// Check if the stream exists and create a new stream if doesn't exist
pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), StreamError> {
if metadata::STREAM_INFO.stream_exists(stream_name) {
return Ok(());
}

create_stream(stream_name.to_string()).await
}

pub async fn create_stream(stream_name: String) -> Result<(), StreamError> {
pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError> {
// fail to proceed if invalid stream name
validator::stream_name(&stream_name)?;

// Proceed to create log stream if it doesn't exist
let storage = CONFIG.storage().get_object_store();
if let Err(e) = storage.create_stream(&stream_name).await {
// Fail if unable to create log stream on object store backend
return Err(StreamError::Custom {
msg: format!("failed to create log stream {stream_name} due to err: {e}"),
status: StatusCode::INTERNAL_SERVER_ERROR,
});
if let Err(err) = storage.create_stream(&stream_name).await {
return Err(CreateStreamError::Storage { stream_name, err });
}
metadata::STREAM_INFO.add_stream(stream_name.to_string());

Expand All @@ -308,9 +293,20 @@ pub mod error {
};

#[derive(Debug, thiserror::Error)]
pub enum StreamError {
pub enum CreateStreamError {
#[error("Stream name validation failed due to {0}")]
StreamNameValidation(#[from] StreamNameValidationError),
#[error("failed to create log stream {stream_name} due to err: {err}")]
Storage {
stream_name: String,
err: ObjectStorageError,
},
}

#[derive(Debug, thiserror::Error)]
pub enum StreamError {
#[error("{0}")]
CreateStream(#[from] CreateStreamError),
#[error("Log stream {0} does not exist")]
StreamNotFound(String),
#[error("Log stream is not initialized, send an event to this logstream and try again")]
Expand Down Expand Up @@ -341,7 +337,12 @@ pub mod error {
impl actix_web::ResponseError for StreamError {
fn status_code(&self) -> http::StatusCode {
match self {
StreamError::StreamNameValidation(_) => StatusCode::BAD_REQUEST,
StreamError::CreateStream(CreateStreamError::StreamNameValidation(_)) => {
StatusCode::BAD_REQUEST
}
StreamError::CreateStream(CreateStreamError::Storage { .. }) => {
StatusCode::INTERNAL_SERVER_ERROR
}
StreamError::StreamNotFound(_) => StatusCode::NOT_FOUND,
StreamError::Custom { status, .. } => *status,
StreamError::UninitializedLogstream => StatusCode::METHOD_NOT_ALLOWED,
Expand Down

0 comments on commit 5e613e1

Please sign in to comment.