diff --git a/native/core/src/execution/jni_api.rs b/native/core/src/execution/jni_api.rs index eb73675b5..2c1a55f48 100644 --- a/native/core/src/execution/jni_api.rs +++ b/native/core/src/execution/jni_api.rs @@ -231,7 +231,7 @@ fn prepare_output( array_addrs: jlongArray, schema_addrs: jlongArray, output_batch: RecordBatch, - exec_context: &mut ExecutionContext, + validate: bool, ) -> CometResult { let array_address_array = unsafe { JLongArray::from_raw(array_addrs) }; let num_cols = env.get_array_length(&array_address_array)? as usize; @@ -255,7 +255,7 @@ fn prepare_output( ))); } - if exec_context.debug_native { + if validate { // Validate the output arrays. for array in results.iter() { let array_data = array.to_data(); @@ -275,9 +275,6 @@ fn prepare_output( i += 1; } - // Update metrics - update_metrics(env, exec_context)?; - Ok(num_rows as jlong) } @@ -356,22 +353,22 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( let next_item = exec_context.stream.as_mut().unwrap().next(); let poll_output = exec_context.runtime.block_on(async { poll!(next_item) }); + // Update metrics + update_metrics(&mut env, exec_context)?; + match poll_output { Poll::Ready(Some(output)) => { + // prepare output for FFI transfer return prepare_output( &mut env, array_addrs, schema_addrs, output?, - exec_context, + exec_context.debug_native, ); } Poll::Ready(None) => { // Reaches EOF of output. - - // Update metrics - update_metrics(&mut env, exec_context)?; - if exec_context.explain_native { if let Some(plan) = &exec_context.root_op { let formatted_plan_str = @@ -391,9 +388,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan( // A poll pending means there are more than one blocking operators, // we don't need go back-forth between JVM/Native. Just keeping polling. Poll::Pending => { - // Update metrics - update_metrics(&mut env, exec_context)?; - // Pull input batches pull_input_batches(exec_context)?;