Skip to content

Commit

Permalink
move diagram function to new module
Browse files Browse the repository at this point in the history
  • Loading branch information
milenkovicm committed Dec 10, 2024
1 parent a102d3a commit 9df888c
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 153 deletions.
148 changes: 148 additions & 0 deletions ballista/core/src/diagram.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::error::Result;
use crate::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec};

use datafusion::datasource::physical_plan::{CsvExec, ParquetExec};
use datafusion::physical_plan::aggregates::AggregateExec;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::joins::HashJoinExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::ExecutionPlan;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

pub fn produce_diagram(filename: &str, stages: &[Arc<ShuffleWriterExec>]) -> Result<()> {
let write_file = File::create(filename)?;
let mut w = BufWriter::new(&write_file);
writeln!(w, "digraph G {{")?;

// draw stages and entities
for stage in stages {
writeln!(w, "\tsubgraph cluster{} {{", stage.stage_id())?;
writeln!(w, "\t\tlabel = \"Stage {}\";", stage.stage_id())?;
let mut id = AtomicUsize::new(0);
build_exec_plan_diagram(
&mut w,
stage.children()[0].as_ref(),
stage.stage_id(),
&mut id,
true,
)?;
writeln!(w, "\t}}")?;
}

// draw relationships
for stage in stages {
let mut id = AtomicUsize::new(0);
build_exec_plan_diagram(
&mut w,
stage.children()[0].as_ref(),
stage.stage_id(),
&mut id,
false,
)?;
}

write!(w, "}}")?;
Ok(())
}

fn build_exec_plan_diagram(
w: &mut BufWriter<&File>,
plan: &dyn ExecutionPlan,
stage_id: usize,
id: &mut AtomicUsize,
draw_entity: bool,
) -> Result<usize> {
let operator_str = if plan.as_any().downcast_ref::<AggregateExec>().is_some() {
"AggregateExec"
} else if plan.as_any().downcast_ref::<SortExec>().is_some() {
"SortExec"
} else if plan.as_any().downcast_ref::<ProjectionExec>().is_some() {
"ProjectionExec"
} else if plan.as_any().downcast_ref::<HashJoinExec>().is_some() {
"HashJoinExec"
} else if plan.as_any().downcast_ref::<ParquetExec>().is_some() {
"ParquetExec"
} else if plan.as_any().downcast_ref::<CsvExec>().is_some() {
"CsvExec"
} else if plan.as_any().downcast_ref::<FilterExec>().is_some() {
"FilterExec"
} else if plan.as_any().downcast_ref::<ShuffleWriterExec>().is_some() {
"ShuffleWriterExec"
} else if plan
.as_any()
.downcast_ref::<UnresolvedShuffleExec>()
.is_some()
{
"UnresolvedShuffleExec"
} else if plan
.as_any()
.downcast_ref::<CoalesceBatchesExec>()
.is_some()
{
"CoalesceBatchesExec"
} else if plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
.is_some()
{
"CoalescePartitionsExec"
} else {
println!("Unknown: {plan:?}");
"Unknown"
};

let node_id = id.load(Ordering::SeqCst);
id.store(node_id + 1, Ordering::SeqCst);

if draw_entity {
writeln!(
w,
"\t\tstage_{stage_id}_exec_{node_id} [shape=box, label=\"{operator_str}\"];"
)?;
}
for child in plan.children() {
if let Some(shuffle) = child.as_any().downcast_ref::<UnresolvedShuffleExec>() {
if !draw_entity {
writeln!(
w,
"\tstage_{}_exec_1 -> stage_{}_exec_{};",
shuffle.stage_id, stage_id, node_id
)?;
}
} else {
// relationships within same entity
let child_id =
build_exec_plan_diagram(w, child.as_ref(), stage_id, id, draw_entity)?;
if draw_entity {
writeln!(
w,
"\t\tstage_{stage_id}_exec_{child_id} -> stage_{stage_id}_exec_{node_id};"
)?;
}
}
}
Ok(node_id)
}
5 changes: 2 additions & 3 deletions ballista/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@ pub fn print_version() {
pub mod client;
pub mod config;
pub mod consistent_hash;
pub mod diagram;
pub mod error;
pub mod event_loop;
pub mod execution_plans;
pub mod extension;
pub mod registry;
pub mod utils;

#[macro_use]
pub mod serde;
pub mod utils;

///
/// [RuntimeProducer] is a factory which creates runtime [RuntimeEnv]
Expand Down
129 changes: 1 addition & 128 deletions ballista/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

use crate::config::BallistaConfig;
use crate::error::{BallistaError, Result};
use crate::execution_plans::{
DistributedQueryExec, ShuffleWriterExec, UnresolvedShuffleExec,
};
use crate::execution_plans::DistributedQueryExec;

use crate::extension::SessionConfigExt;
use crate::serde::scheduler::PartitionStats;
Expand All @@ -32,29 +30,19 @@ use datafusion::arrow::ipc::writer::StreamWriter;
use datafusion::arrow::ipc::CompressionType;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::tree_node::{TreeNode, TreeNodeVisitor};
use datafusion::datasource::physical_plan::{CsvExec, ParquetExec};
use datafusion::error::DataFusionError;
use datafusion::execution::context::{QueryPlanner, SessionConfig, SessionState};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::logical_expr::{DdlStatement, LogicalPlan, TableScan};
use datafusion::physical_plan::aggregates::AggregateExec;
use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::joins::HashJoinExec;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::physical_plan::{metrics, ExecutionPlan, RecordBatchStream};
use datafusion::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
use datafusion_proto::logical_plan::{AsLogicalPlan, LogicalExtensionCodec};
use futures::StreamExt;
use log::error;
use std::io::{BufWriter, Write};
use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{fs::File, pin::Pin};
Expand Down Expand Up @@ -129,121 +117,6 @@ pub async fn collect_stream(
Ok(batches)
}

pub fn produce_diagram(filename: &str, stages: &[Arc<ShuffleWriterExec>]) -> Result<()> {
let write_file = File::create(filename)?;
let mut w = BufWriter::new(&write_file);
writeln!(w, "digraph G {{")?;

// draw stages and entities
for stage in stages {
writeln!(w, "\tsubgraph cluster{} {{", stage.stage_id())?;
writeln!(w, "\t\tlabel = \"Stage {}\";", stage.stage_id())?;
let mut id = AtomicUsize::new(0);
build_exec_plan_diagram(
&mut w,
stage.children()[0].as_ref(),
stage.stage_id(),
&mut id,
true,
)?;
writeln!(w, "\t}}")?;
}

// draw relationships
for stage in stages {
let mut id = AtomicUsize::new(0);
build_exec_plan_diagram(
&mut w,
stage.children()[0].as_ref(),
stage.stage_id(),
&mut id,
false,
)?;
}

write!(w, "}}")?;
Ok(())
}

fn build_exec_plan_diagram(
w: &mut BufWriter<&File>,
plan: &dyn ExecutionPlan,
stage_id: usize,
id: &mut AtomicUsize,
draw_entity: bool,
) -> Result<usize> {
let operator_str = if plan.as_any().downcast_ref::<AggregateExec>().is_some() {
"AggregateExec"
} else if plan.as_any().downcast_ref::<SortExec>().is_some() {
"SortExec"
} else if plan.as_any().downcast_ref::<ProjectionExec>().is_some() {
"ProjectionExec"
} else if plan.as_any().downcast_ref::<HashJoinExec>().is_some() {
"HashJoinExec"
} else if plan.as_any().downcast_ref::<ParquetExec>().is_some() {
"ParquetExec"
} else if plan.as_any().downcast_ref::<CsvExec>().is_some() {
"CsvExec"
} else if plan.as_any().downcast_ref::<FilterExec>().is_some() {
"FilterExec"
} else if plan.as_any().downcast_ref::<ShuffleWriterExec>().is_some() {
"ShuffleWriterExec"
} else if plan
.as_any()
.downcast_ref::<UnresolvedShuffleExec>()
.is_some()
{
"UnresolvedShuffleExec"
} else if plan
.as_any()
.downcast_ref::<CoalesceBatchesExec>()
.is_some()
{
"CoalesceBatchesExec"
} else if plan
.as_any()
.downcast_ref::<CoalescePartitionsExec>()
.is_some()
{
"CoalescePartitionsExec"
} else {
println!("Unknown: {plan:?}");
"Unknown"
};

let node_id = id.load(Ordering::SeqCst);
id.store(node_id + 1, Ordering::SeqCst);

if draw_entity {
writeln!(
w,
"\t\tstage_{stage_id}_exec_{node_id} [shape=box, label=\"{operator_str}\"];"
)?;
}
for child in plan.children() {
if let Some(shuffle) = child.as_any().downcast_ref::<UnresolvedShuffleExec>() {
if !draw_entity {
writeln!(
w,
"\tstage_{}_exec_1 -> stage_{}_exec_{};",
shuffle.stage_id, stage_id, node_id
)?;
}
} else {
// relationships within same entity
let child_id =
build_exec_plan_diagram(w, child.as_ref(), stage_id, id, draw_entity)?;
if draw_entity {
writeln!(
w,
"\t\tstage_{stage_id}_exec_{child_id} -> stage_{stage_id}_exec_{node_id};"
)?;
}
}
}
Ok(node_id)
}

pub struct BallistaQueryPlanner<T: AsLogicalPlan> {
scheduler_url: String,
config: BallistaConfig,
Expand Down
22 changes: 10 additions & 12 deletions examples/examples/custom-executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,17 @@ async fn main() -> ballista_core::error::Result<()> {
.is_test(true)
.try_init();

let mut config: ExecutorProcessConfig = ExecutorProcessConfig::default();

// overriding default config producer with custom producer
// which has required S3 configuration options
config.override_config_producer =
Some(Arc::new(custom_session_config_with_s3_options));

// overriding default runtime producer with custom producer
// which knows how to create S3 connections
config.override_runtime_producer =
Some(Arc::new(|session_config: &SessionConfig| {
let config: ExecutorProcessConfig = ExecutorProcessConfig {
// overriding default config producer with custom producer
// which has required S3 configuration options
override_config_producer: Some(Arc::new(custom_session_config_with_s3_options)),
// overriding default runtime producer with custom producer
// which knows how to create S3 connections
override_runtime_producer: Some(Arc::new(|session_config: &SessionConfig| {
custom_runtime_env_with_s3_support(session_config)
}));
})),
..Default::default()
};

start_executor_process(Arc::new(config)).await
}
21 changes: 11 additions & 10 deletions examples/examples/custom-scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,23 @@ async fn main() -> ballista_core::error::Result<()> {
.is_test(true)
.try_init();

let mut config: SchedulerConfig = SchedulerConfig::default();
let config: SchedulerConfig = SchedulerConfig {
// overriding default runtime producer with custom producer
// which knows how to create S3 connections
override_config_producer: Some(Arc::new(custom_session_config_with_s3_options)),
// overriding default session builder, which has custom session configuration
// runtime environment and session state.
override_session_builder: Some(Arc::new(|session_config: SessionConfig| {
custom_session_state_with_s3_support(session_config)
})),
..Default::default()
};

let addr = format!("{}:{}", config.bind_host, config.bind_port);
let addr = addr
.parse()
.map_err(|e: AddrParseError| BallistaError::Configuration(e.to_string()))?;

// overriding default runtime producer with custom producer
// which knows how to create S3 connections
config.override_config_producer =
Some(Arc::new(custom_session_config_with_s3_options));
// overriding default session builder, which has custom session configuration
// runtime environment and session state.
config.override_session_builder = Some(Arc::new(|session_config: SessionConfig| {
custom_session_state_with_s3_support(session_config)
}));
let cluster = BallistaCluster::new_from_config(&config).await?;
start_server(cluster, addr, Arc::new(config)).await?;

Expand Down

0 comments on commit 9df888c

Please sign in to comment.