Skip to content

Commit

Permalink
Fix with meta flag for analytics queries
Browse files Browse the repository at this point in the history
  • Loading branch information
Westwooo committed Dec 11, 2024
1 parent 2af7eee commit 906e4b5
Showing 1 changed file with 68 additions and 1 deletion.
69 changes: 68 additions & 1 deletion src/cli/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ use nu_engine::command_prelude::Call;
use nu_engine::CallExt;
use nu_protocol::engine::{Command, EngineState, Stack};
use nu_protocol::{
Category, ListStream, PipelineData, ShellError, Signature, Span, SyntaxShape, Value,
Category, IntoPipelineData, ListStream, PipelineData, ShellError, Signature, Span, SyntaxShape,
Value,
};
use nu_utils::SharedCow;
use std::str::from_utf8;
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;
Expand Down Expand Up @@ -128,9 +130,11 @@ fn run(
let statement: String = call.req(engine_state, stack, 0)?;

let scope: Option<String> = call.get_flag(engine_state, stack, "scope")?;
let with_meta = call.has_flag(engine_state, stack, "with-meta")?;

debug!("Running analytics query {}", &statement);

let mut results: Vec<Value> = vec![];
let mut streams = StreamMap::new();
let rt = Arc::new(Runtime::new().unwrap());
for identifier in cluster_identifiers.clone() {
Expand Down Expand Up @@ -164,11 +168,74 @@ fn run(
inner: vec![],
})?;

if with_meta {
let mut query_result_values = vec![];
while let Some(query_result) = rt
.block_on(async { json_streamer.read_row().await })
.map_err(|e| ShellError::GenericError {
error: format!("failed to read analytics query result: {}", e),
msg: "".to_string(),
span: None,
help: None,
inner: vec![],
})?
{
let result_as_string = from_utf8(&query_result).unwrap();
let result_as_json =
serde_json::from_str::<serde_json::Value>(&result_as_string).unwrap();
query_result_values
.push(convert_json_value_to_nu_value(&result_as_json, span).unwrap())
}

let meta = rt
.block_on(async { json_streamer.read_epilog().await })
.map_err(|e| ShellError::GenericError {
error: format!("failed to read stream epilog: {}", e),
msg: "".to_string(),
span: None,
help: None,
inner: vec![],
})?;
let meta_json =
serde_json::from_str::<serde_json::Value>(&from_utf8(&meta).unwrap()).unwrap();
let meta_value = convert_json_value_to_nu_value(&meta_json, span).unwrap();
let meta_as_record: &mut nu_protocol::Record = &mut meta_value.into_record().unwrap();

meta_as_record.push(
"cluster",
Value::String {
val: identifier.clone(),
internal_span: span,
},
);

meta_as_record.push(
"results",
Value::List {
vals: query_result_values,
internal_span: span,
},
);

results.push(Value::Record {
val: SharedCow::new(meta_as_record.clone()),
internal_span: span,
})
}

streams.insert(identifier, json_streamer);
}

let result_stream = AnalyticsStream { streams, span, rt };

if with_meta {
return Ok(Value::List {
vals: results,
internal_span: span,
}
.into_pipeline_data());
}

Ok(PipelineData::from(ListStream::new(
result_stream,
span,
Expand Down

0 comments on commit 906e4b5

Please sign in to comment.