Skip to content

Commit

Permalink
Fix with meta flag for analytics queries
Browse files Browse the repository at this point in the history
  • Loading branch information
Westwooo committed Dec 6, 2024
1 parent b18899e commit ef82ffb
Showing 1 changed file with 50 additions and 1 deletion.
51 changes: 50 additions & 1 deletion src/cli/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use nu_engine::command_prelude::Call;
use nu_engine::CallExt;
use nu_protocol::engine::{Command, EngineState, Stack};
use nu_protocol::{
Category, ListStream, PipelineData, ShellError, Signature, Span, SyntaxShape, Value,
Category, IntoPipelineData, ListStream, PipelineData, ShellError, Signature, Span, SyntaxShape,
Value,
};
use std::str::from_utf8;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -128,9 +129,11 @@ fn run(
let statement: String = call.req(engine_state, stack, 0)?;

let scope: Option<String> = call.get_flag(engine_state, stack, "scope")?;
let with_meta = call.has_flag(engine_state, stack, "with-meta")?;

debug!("Running analytics query {}", &statement);

let mut results: Vec<Value> = vec![];
let mut streams = StreamMap::new();
let rt = Arc::new(Runtime::new().unwrap());
for identifier in cluster_identifiers.clone() {
Expand Down Expand Up @@ -164,11 +167,57 @@ fn run(
inner: vec![],
})?;

if with_meta {
let mut query_results = vec![];
while let Some(mut query_result) = rt
.block_on(async { json_streamer.read_row().await })
.map_err(|e| ShellError::GenericError {
error: format!("failed to read analytics query result: {}", e),
msg: "".to_string(),
span: None,
help: None,
inner: vec![],
})?
{
query_results.append(&mut query_result)
}

let epilog = rt
.block_on(async { json_streamer.read_epilog().await })
.map_err(|e| ShellError::GenericError {
error: format!("failed to read stream epilog: {}", e),
msg: "".to_string(),
span: None,
help: None,
inner: vec![],
})?;

let (start, _) = from_utf8(&epilog).unwrap().split_at(epilog.len() - 1);
let results_with_meta = format!(
"{}, \"cluster\": \"{}\", \"results\": {} }}",
start,
identifier,
String::from_utf8(query_results).unwrap()
);
let json_query_results =
serde_json::from_str::<serde_json::Value>(&results_with_meta).unwrap();
let value = convert_json_value_to_nu_value(&json_query_results, span).unwrap();
results.push(value)
}

streams.insert(identifier, json_streamer);
}

let result_stream = AnalyticsStream { streams, span, rt };

if with_meta {
return Ok(Value::List {
vals: results,
internal_span: span,
}
.into_pipeline_data());
}

Ok(PipelineData::from(ListStream::new(
result_stream,
span,
Expand Down

0 comments on commit ef82ffb

Please sign in to comment.