diff --git a/src/cli/analytics.rs b/src/cli/analytics.rs index 30fdd449..1623ec6b 100644 --- a/src/cli/analytics.rs +++ b/src/cli/analytics.rs @@ -9,8 +9,10 @@ use nu_engine::command_prelude::Call; use nu_engine::CallExt; use nu_protocol::engine::{Command, EngineState, Stack}; use nu_protocol::{ - Category, ListStream, PipelineData, ShellError, Signature, Span, SyntaxShape, Value, + Category, IntoPipelineData, ListStream, PipelineData, ShellError, Signature, Span, SyntaxShape, + Value, }; +use nu_utils::SharedCow; use std::str::from_utf8; use std::sync::{Arc, Mutex}; use tokio::runtime::Runtime; @@ -128,9 +130,11 @@ fn run( let statement: String = call.req(engine_state, stack, 0)?; let scope: Option = call.get_flag(engine_state, stack, "scope")?; + let with_meta = call.has_flag(engine_state, stack, "with-meta")?; debug!("Running analytics query {}", &statement); + let mut results: Vec = vec![]; let mut streams = StreamMap::new(); let rt = Arc::new(Runtime::new().unwrap()); for identifier in cluster_identifiers.clone() { @@ -164,11 +168,74 @@ fn run( inner: vec![], })?; + if with_meta { + let mut query_result_values = vec![]; + while let Some(query_result) = rt + .block_on(async { json_streamer.read_row().await }) + .map_err(|e| ShellError::GenericError { + error: format!("failed to read analytics query result: {}", e), + msg: "".to_string(), + span: None, + help: None, + inner: vec![], + })? + { + let result_as_string = from_utf8(&query_result).unwrap(); + let result_as_json = + serde_json::from_str::(&result_as_string).unwrap(); + query_result_values + .push(convert_json_value_to_nu_value(&result_as_json, span).unwrap()) + } + + let meta = rt + .block_on(async { json_streamer.read_epilog().await }) + .map_err(|e| ShellError::GenericError { + error: format!("failed to read stream epilog: {}", e), + msg: "".to_string(), + span: None, + help: None, + inner: vec![], + })?; + let meta_json = + serde_json::from_str::(&from_utf8(&meta).unwrap()).unwrap(); + let meta_value = convert_json_value_to_nu_value(&meta_json, span).unwrap(); + let meta_as_record: &mut nu_protocol::Record = &mut meta_value.into_record().unwrap(); + + meta_as_record.push( + "cluster", + Value::String { + val: identifier.clone(), + internal_span: span, + }, + ); + + meta_as_record.push( + "results", + Value::List { + vals: query_result_values, + internal_span: span, + }, + ); + + results.push(Value::Record { + val: SharedCow::new(meta_as_record.clone()), + internal_span: span, + }) + } + streams.insert(identifier, json_streamer); } let result_stream = AnalyticsStream { streams, span, rt }; + if with_meta { + return Ok(Value::List { + vals: results, + internal_span: span, + } + .into_pipeline_data()); + } + Ok(PipelineData::from(ListStream::new( result_stream, span,