Skip to content

Commit

Permalink
Merge branch 'main' into compose-distributed
Browse files Browse the repository at this point in the history
  • Loading branch information
nitisht authored Apr 23, 2024
2 parents c096bcf + e355858 commit 1b11629
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 88 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ You can download and run the Parseable binary on your laptop.
- Linux or MacOS

```bash
curl https://raw.githubusercontent.com/parseablehq/parseable/main/scripts/download.sh | bash
curl -fsSL https://logg.ing/install | sh
```

- Windows

```pwsh
powershell -c "irm https://raw.githubusercontent.com/parseablehq/parseable/main/scripts/download.ps1 | iex"
powershell -c "irm https://logg.ing/install-windows | iex"
```

Once this runs successfully, you'll see dashboard at [http://localhost:8000 ↗︎](http://localhost:8000). You can login to the dashboard default credentials `admin`, `admin`.
Expand Down
39 changes: 13 additions & 26 deletions scripts/download.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,13 @@
# supported CPU architectures and operating systems
SUPPORTED_ARCH=("x86_64" "arm64")
SUPPORTED_OS=("linux" "darwin")
# Associate binaries with CPU architectures and operating systems
declare -A BINARIES=(
["x86_64-linux"]="Parseable_x86_64-unknown-linux-gnu"
["arm64-linux"]="Parseable_aarch64-unknown-linux-gnu"
["x86_64-darwin"]="Parseable_x86_64-apple-darwin"
["arm64-darwin"]="Parseable_aarch64-apple-darwin"
)
DOWNLOAD_BASE_URL="parseable.gateway.scarf.sh/"

# Get the system's CPU architecture and operating system
CPU_ARCH=$(uname -m)
OS=$(uname -s | tr '[:upper:]' '[:lower:]')

printf "\n=========================\n"
printf "Detected CPU architecture: %s\n" "$CPU_ARCH"
printf "Detected operating system: %s\n" "$OS"

Expand All @@ -34,38 +30,29 @@ if ! echo "${SUPPORTED_OS[@]}" | grep -q "\\b${OS}\\b"; then
echo "Error: Unsupported operating system (${OS})."
exit 1
fi

# Get the latest release information using GitHub API
release=$(curl -s "https://api.github.com/repos/parseablehq/parseable/releases/latest")
# find the release tag
release_tag=$(echo "$release" | grep -o "\"tag_name\":\s*\"[^\"]*\"" | cut -d '"' -f 4)
printf "Found latest release version: $release_tag\n"

printf "Fetching release information for parseable...\n"

# Loop through binaries in the release and find the appropriate one
for arch_os in "${CPU_ARCH}-${OS}"; do
binary_name="${BINARIES[$arch_os]}"
download_url=$(echo "$release" | grep -o "\"browser_download_url\":\s*\"[^\"]*${binary_name}\"" | cut -d '"' -f 4)
if [ -n "$download_url" ]; then
break
fi
done
download_url=${DOWNLOAD_BASE_URL}${CPU_ARCH}-${OS}.${release_tag}

printf "Checking for existing installation...\n"
if [[ -d ${INSTALL_DIR} ]]; then
printf "A Previous version of parseable already exists. Run 'parseable --version' to check the version."
printf "or consider removing that before Installing"
printf "or consider removing that before new installation\n"
exit 1

else
printf "No Previous installation found\n"
printf "Installing parseable...\n"
mkdir -p ${BIN_DIR}
fi


# Download the binary using curl or wget
printf "Downloading Parseable version $release_tag, for OS: $OS, CPU architecture: $CPU_ARCH\n\n"
if command -v curl &>/dev/null; then
curl -L -o "${BIN_NAME}" "$download_url" 2&>> /dev/null
curl -L -o "${BIN_NAME}" "$download_url"
elif command -v wget &>/dev/null; then
wget -O "${BIN_NAME}" "$download_url" 2&>> /dev/null
wget -O "${BIN_NAME}" "$download_url"
else
echo "Error: Neither curl nor wget found. Please install either curl or wget."
exit 1
Expand All @@ -79,4 +66,4 @@ printf "Adding parseable to the path\n"
PATH_STR="export PATH=${BIN_DIR}"':$PATH'
echo ${PATH_STR} >> ${RC_FILE_PATH}

echo "parseable was added to the path. Please refresh the environment by sourcing the ${RC_PATH}"
echo "parseable was added to the path. Please refresh the environment by sourcing the ${RC_FILE_PATH}"
122 changes: 65 additions & 57 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::event::commit_schema;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::{Mode, CONFIG};
use crate::query::error::ExecuteError;
use crate::query::Query as LogicalQuery;
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::rbac::role::{Action, Permission};
use crate::rbac::Users;
Expand Down Expand Up @@ -67,69 +68,37 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
let raw_logical_plan = session_state
.create_logical_plan(&query_request.query)
.await?;

// create a visitor to extract the table name
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);
let table_name = visitor
.into_inner()
.pop()
.ok_or(QueryError::MalformedQuery(
"No table found from sql".to_string(),
))?;

let tables = visitor.into_inner();

if CONFIG.parseable.mode == Mode::Query {
if let Ok(new_schema) = fetch_schema(&table_name).await {
// commit schema merges the schema internally and updates the schema in storage.
commit_schema_to_storage(&table_name, new_schema.clone())
.await
.map_err(QueryError::ObjectStorage)?;
commit_schema(&table_name, Arc::new(new_schema)).map_err(QueryError::EventError)?;
for table in tables {
if let Ok(new_schema) = fetch_schema(&table).await {
// commit schema merges the schema internally and updates the schema in storage.
commit_schema_to_storage(&table, new_schema.clone())
.await
.map_err(QueryError::ObjectStorage)?;
commit_schema(&table, Arc::new(new_schema)).map_err(QueryError::EventError)?;
}
}
}

let mut query = into_query(&query_request, &session_state).await?;
let mut query: LogicalQuery = into_query(&query_request, &session_state).await?;

let creds = extract_session_key_from_req(&req).expect("expects basic auth");
let permissions = Users.get_permissions(&creds);
let permissions: Vec<Permission> = Users.get_permissions(&creds);

// check authorization of this query if it references physical table;
let table_name = query.table_name();
if let Some(ref table) = table_name {
let mut authorized = false;
let mut tags = Vec::new();

// in permission check if user can run query on the stream.
// also while iterating add any filter tags for this stream
for permission in permissions {
match permission {
Permission::Stream(Action::All, _) => {
authorized = true;
break;
}
Permission::StreamWithTag(Action::Query, ref stream, tag)
if stream == table || stream == "*" =>
{
authorized = true;
if let Some(tag) = tag {
tags.push(tag)
}
}
_ => (),
}
}

if !authorized {
return Err(QueryError::Unauthorized);
}

if !tags.is_empty() {
query.filter_tag = Some(tags)
}
}
let table_name = query
.first_table_name()
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query".to_string()))?;
authorize_and_set_filter_tags(&mut query, permissions, &table_name)?;

let time = Instant::now();

let (records, fields) = query.execute(table_name.clone().unwrap()).await?;
let (records, fields) = query.execute(table_name.clone()).await?;
let response = QueryResponse {
records,
fields,
Expand All @@ -138,16 +107,55 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
}
.to_http();

if let Some(table) = table_name {
let time = time.elapsed().as_secs_f64();
QUERY_EXECUTE_TIME
.with_label_values(&[&table])
.observe(time);
}
let time = time.elapsed().as_secs_f64();

QUERY_EXECUTE_TIME
.with_label_values(&[&table_name])
.observe(time);

Ok(response)
}

fn authorize_and_set_filter_tags(
query: &mut LogicalQuery,
permissions: Vec<Permission>,
table_name: &str,
) -> Result<(), QueryError> {
// check authorization of this query if it references physical table;
let mut authorized = false;
let mut tags = Vec::new();

// in permission check if user can run query on the stream.
// also while iterating add any filter tags for this stream
for permission in permissions {
match permission {
Permission::Stream(Action::All, _) => {
authorized = true;
break;
}
Permission::StreamWithTag(Action::Query, ref stream, tag)
if stream == table_name || stream == "*" =>
{
authorized = true;
if let Some(tag) = tag {
tags.push(tag)
}
}
_ => (),
}
}

if !authorized {
return Err(QueryError::Unauthorized);
}

if !tags.is_empty() {
query.filter_tag = Some(tags)
}

Ok(())
}

impl FromRequest for Query {
type Error = actix_web::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self, Self::Error>>>>;
Expand Down Expand Up @@ -178,7 +186,7 @@ impl FromRequest for Query {
async fn into_query(
query: &Query,
session_state: &SessionState,
) -> Result<crate::query::Query, QueryError> {
) -> Result<LogicalQuery, QueryError> {
if query.query.is_empty() {
return Err(QueryError::EmptyQuery);
}
Expand Down
6 changes: 3 additions & 3 deletions server/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl Query {
}
}

pub fn table_name(&self) -> Option<String> {
pub fn first_table_name(&self) -> Option<String> {
let mut visitor = TableScanVisitor::default();
let _ = self.raw_logical_plan.visit(&mut visitor);
visitor.into_inner().pop()
Expand All @@ -192,7 +192,7 @@ impl TreeNodeVisitor for TableScanVisitor {
match node {
LogicalPlan::TableScan(table) => {
self.tables.push(table.table_name.table().to_string());
Ok(VisitRecursion::Stop)
Ok(VisitRecursion::Skip)
}
_ => Ok(VisitRecursion::Continue),
}
Expand Down Expand Up @@ -290,7 +290,7 @@ fn table_contains_any_time_filters(
})
.any(|expr| {
matches!(&*expr.left, Expr::Column(Column { name, .. })
if ((time_partition.is_some() && name == time_partition.as_ref().unwrap()) ||
if ((time_partition.is_some() && name == time_partition.as_ref().unwrap()) ||
(!time_partition.is_some() && name == event::DEFAULT_TIMESTAMP_KEY)))
})
}
Expand Down

0 comments on commit 1b11629

Please sign in to comment.