Skip to content

Commit

Permalink
Merge pull request #2 from carsonwang/revert-1-mergeStreamingWithMaster
Browse files Browse the repository at this point in the history
Revert "Merge streaming with master"
  • Loading branch information
carsonwang authored Nov 8, 2016
2 parents 6634fe9 + 38c8a27 commit 1f49a4f
Show file tree
Hide file tree
Showing 177 changed files with 43,376 additions and 4,746 deletions.
49 changes: 22 additions & 27 deletions bin/functions/hibench_prop_env_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,36 +123,31 @@
MODEL="hibench.nweight.model",

# For streaming bench
STREAMING_TESTCASE="hibench.streambench.testCase",

# zkHelper
STREAMING_ZKHELPER_JAR="hibench.streamingbench.zkhelper.jar",
# prepare
STREAMING_TOPIC_NAME="hibench.streambench.kafka.topic",
STREAMING_KAFKA_HOME="hibench.streambench.kafka.home",
STREAMING_ZKADDR="hibench.streambench.zkHost",
STREAMING_CONSUMER_GROUP="hibench.streambench.kafka.consumerGroup",
STREAMING_DATA_DIR="hibench.streambench.datagen.dir",
STREAMING_DATA1_NAME="hibench.streambench.datagen.data1.name",
STREAMING_DATA1_DIR="hibench.streambench.datagen.data1.dir",
STREAMING_DATA1_LENGTH="hibench.streambench.datagen.recordLength",
STREAMING_DATA2_SAMPLE_DIR="hibench.streambench.datagen.data2_samples.dir",
STREAMING_DATA2_CLUSTER_DIR="hibench.streambench.datagen.data2_cluster.dir",
STREAMING_PARTITIONS="hibench.streambench.kafka.topicPartitions",
DATA_GEN_JAR="hibench.streambench.datagen.jar",
STREAMING_TOPIC_NAME="hibench.streamingbench.topic_name",
STREAMING_KAFKA_HOME="hibench.streamingbench.kafka.home",
STREAMING_ZKADDR="hibench.streamingbench.zookeeper.host",
STREAMING_CONSUMER_GROUP="hibench.streamingbench.consumer_group",
STREAMING_DATA_SCALE_FACTOR="hibench.streamingbench.datagen.scale_factor",
STREAMING_DATA_DIR="hibench.streamingbench.datagen.dir",
STREAMING_DATA1_NAME="hibench.streamingbench.datagen.data1.name",
STREAMING_DATA1_DIR="hibench.streamingbench.datagen.data1.dir",
STREAMING_DATA1_LENGTH="hibench.streamingbench.datagen.data1.length",
STREAMING_DATA2_SAMPLE_DIR="hibench.streamingbench.datagen.data2_samples.dir",
STREAMING_DATA2_CLUSTER_DIR="hibench.streamingbench.datagen.data2_cluster.dir",
STREAMING_PARTITIONS="hibench.streamingbench.partitions",
DATA_GEN_JAR="hibench.streamingbench.datagen.jar",

STREAMING_DATAGEN_MODE="hibench.streamingbench.prepare.mode",
STREAMING_DATAGEN_RECORDS="hibench.streamingbench.prepare.push.records",

# sparkstreaming
STREAMBENCH_SPARK_JAR="hibench.streambench.sparkbench.jar",
STREAMBENCH_STORM_JAR="hibench.streambench.stormbench.jar",

# gearpump
GEARPUMP_HOME="hibench.streambench.gearpump.home",
STREAMBENCH_GEARPUMP_JAR="hibench.streambench.gearpump.jar",
STREAMBENCH_GEARPUMP_EXECUTORS="hibench.streambench.gearpump.executors",

# flinkstreaming
HIBENCH_FLINK_MASTER="hibench.flink.master",
FLINK_HOME="hibench.streambench.flink.home",
STREAMBENCH_FLINK_JAR="hibench.streambench.flinkbench.jar",
STREAMBENCH_FLINK_PARALLELISM="hibench.streambench.flink.parallelism",
STREAMINGBENCH_JARS="hibench.streamingbench.jars",
STREAMBENCH_STORM_JAR="hibench.streamingbench.stormbench.jar",
STORM_BIN_HOME="hibench.streamingbench.storm.bin",
STREAMING_BENCHNAME="hibench.streamingbench.benchname",

# samza
STREAMING_SAMZA_WORDCOUNT_INTERNAL_TOPIC="samza_internal.wordcount.kafka.input.name",
Expand Down
14 changes: 7 additions & 7 deletions bin/functions/load-config.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,9 @@ def wildcard_replacement(key, value):
finish = False


# wildcard_rules = [(key, HibenchConf[key]) for key in HibenchConf if "*" in key]
wildcard_rules = [(key, HibenchConf[key]) for key in HibenchConf if "*" in key]
# now, let's check wildcard replacement rules
# for key, value in wildcard_rules:
for key, value in wildcard_rules:
# check if we found a rule like: aaa.*.ccc.*.ddd -> bbb.*.*

# wildcard replacement is useful for samza conf, which
Expand All @@ -253,12 +253,12 @@ def wildcard_replacement(key, value):
# switch the order of two wildcards, something like the
# first wildcard in key to match the second wildcard in
# value. I just don't think it'll be needed.
# if not wildcard_replacement(key, value): # not wildcard rules? re-add
if not wildcard_replacement(key, value): # not wildcard rules? re-add
HibenchConf[key] = value
# if wildcard_rules: # need try again
# wildcard_rules = []
# else: break
break
if wildcard_rules: # need try again
wildcard_rules = []
else: break

# all finished, remove values contains no_value_sign
for key in [x for x in HibenchConf if no_value_sign in HibenchConf[x]]:
del HibenchConf[key]
Expand Down
16 changes: 2 additions & 14 deletions bin/functions/workload-functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -271,27 +271,15 @@ function run-spark-job() {
}

function run-streaming-job (){
run-spark-job --jars ${STREAMBENCH_SPARK_JAR} $@
run-spark-job --jars ${STREAMINGBENCH_JARS} $@
}

function run-storm-job(){
CMD="${STORM_HOME}/bin/storm jar ${STREAMBENCH_STORM_JAR} $@"
CMD="${STORM_BIN_HOME}/storm jar ${STREAMBENCH_STORM_JAR} $@"
echo -e "${BGreen}Submit Storm Job: ${Green}$CMD${Color_Off}"
execute_withlog $CMD
}

function run-gearpump-app(){
CMD="${GEARPUMP_HOME}/bin/gear app -executors ${STREAMBENCH_GEARPUMP_EXECUTORS} -jar ${STREAMBENCH_GEARPUMP_JAR} $@"
echo -e "${BGreen}Submit Gearpump Application: ${Green}$CMD${Color_Off}"
execute_withlog $CMD
}

function run-flink-job(){
CMD="${FLINK_HOME}/bin/flink run -p ${STREAMBENCH_FLINK_PARALLELISM} -m ${HIBENCH_FLINK_MASTER} $@ ${STREAMBENCH_FLINK_JAR} ${SPARKBENCH_PROPERTIES_FILES}"
echo -e "${BGreen}Submit Flink Job: ${Green}$CMD${Color_Off}"
execute_withlog $CMD
}

function run-hadoop-job(){
ENABLE_MONITOR=1
if [ "$1" = "--without-monitor" ]; then
Expand Down
6 changes: 0 additions & 6 deletions conf/00-default-properties.conf
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,3 @@ spark.default.parallelism ${hibench.default.map.parallelism}

# set spark sql's default shuffle partitions according to hibench's parallelism value
spark.sql.shuffle.partitions ${hibench.default.map.parallelism}


#=======================================================
# Flink
#=======================================================
hibench.flink.master FLINK_JM_HOST:PORT
202 changes: 98 additions & 104 deletions conf/01-default-streamingbench.conf
Original file line number Diff line number Diff line change
@@ -1,152 +1,146 @@
#########################################################
# General Stream Config
#########################################################
# Two data sets(text and numeric) are available, app argument indicates to use which
#app=micro-sketch #use text dataset, avg record size: 60 bytes
#app=micro-statistics #use numeric dataset, avg record size: 200 bytes
hibench.streamingbench.app micro-sketch

# Note to ensure benchName to be consistent with datagen type. Numeric data for statistics and text data for others
# (available benchname: identity, repartition) TDB: sample project grep wordcount distinctcount statistics
hibench.streambench.testCase identity
# Text dataset can be scaled in terms of record size
hibench.streamingbench.prepare.textdataset_recordsize_factor

# zookeeper address for Kakfa serverce, (default: HOSTNAME:HOSTPORT)
hibench.streambench.zkHost HOSTNAME:HOSTPORT
# Two modes of generator: push,periodic
# Push means to send data to kafka cluster as fast as it could
# Periodic means sending data according to sending rate specification
#hibench.streamingbench.prepare.mode push
hibench.streamingbench.prepare.mode periodic

# Probability used in sample test case
hibench.streambench.sampleProbability 0.1
# Under push mode: number of total records that will be generated
hibench.streamingbench.prepare.push.records 900000000

# Indicate whether in debug mode for correctness verfication (default: false)
hibench.streambench.debugMode false
# Following three params are under periodic mode
# Bytes to push per interval
hibench.streamingbench.prepare.periodic.recordPerInterval 600000

# JARS
hibench.streambench.datagen.jar ${hibench.home}/src/streambench/datagen/target/streaming-bench-datagen-5.0-SNAPSHOT-jar-with-dependencies.jar
hibench.streambench.sparkbench.jar ${hibench.home}/src/streambench/sparkbench/target/streaming-bench-spark-5.0-SNAPSHOT-${hibench.spark.version}-jar-with-dependencies.jar
hibench.streambench.stormbench.jar ${hibench.home}/src/streambench/stormbench/target/streaming-bench-storm-5.0-SNAPSHOT.jar
hibench.streambench.gearpump.jar ${hibench.home}/src/streambench/gearpumpbench/target/streaming-bench-gearpump-5.0-SNAPSHOT-jar-with-dependencies.jar
hibench.streambench.flinkbench.jar ${hibench.home}/src/streambench/flinkbench/target/streaming-bench-flink-5.0-SNAPSHOT-jar-with-dependencies.jar
# Interval time (in ms)
hibench.streamingbench.prepare.periodic.intervalSpan 5000

#########################################################
# Kafka Config
#########################################################
# Total round count of data send
hibench.streamingbench.prepare.periodic.totalRound 100

# Kafka home
hibench.streambench.kafka.home /PATH/TO/KAFKA/HOME

# the topic that spark will receive input data (default: ${hibench.streambench.testCase})
hibench.streambench.kafka.topic ${hibench.streambench.testCase}

# number of partitions of generated topic (default 20)
hibench.streambench.kafka.topicPartitions 20
# zookeeper host:port of kafka cluster

# consumer group of the consumer for kafka (default: HiBench)
hibench.streambench.kafka.consumerGroup HiBench
#example: hostname:9092
hibench.streamingbench.zookeeper.host HOSTNAME:HOSTPORT

# Kafka broker lists, written in mode "host:port,host:port,..." (default: HOSTNAME:HOSTPORT)
hibench.streambench.kafka.brokerList HOSTNAME:HOSTPORT
#Parallel config
# number of nodes that will receive kafka input
hibench.streamingbench.receiver_nodes 4

# Set the starting offset of kafkaConsumer (default: largest)
hibench.streambench.kafka.offsetReset largest
#########################################################
# Data Generator Config
#########################################################
###############
#Benchmark args
#Note to ensure benchName to be consistent with datagen type. Numeric data for statistics and text data for others
# available benchname: identity sample project grep wordcount distinctcount statistics

# Interval span in millisecond (default: 50)
hibench.streambench.datagen.intervalSpan 50
hibench.streamingbench.benchname identity

# Number of records to generate per interval span (default: 5)
hibench.streambench.datagen.recordsPerInterval 5
#common args
# the topic that spark will receive input data
hibench.streamingbench.topic_name ${hibench.streamingbench.benchname}

# Number of total records that will be generated (default: -1 means infinity)
hibench.streambench.datagen.totalRecords -1
# Spark stream batch interval (in seconds)
hibench.streamingbench.batch_interval 10

# Total round count of data send (default: -1 means infinity)
hibench.streambench.datagen.totalRounds -1
# consumer group of the spark consumer for kafka
hibench.streamingbench.consumer_group HiBench

# default path to store seed files (default: ${hibench.hdfs.data.dir}/Streaming)
hibench.streambench.datagen.dir ${hibench.hdfs.data.dir}/Streaming
# expected number of records to be processed
hibench.streamingbench.record_count 900000000

# fixed length of record (default: 200)
hibench.streambench.datagen.recordLength 200
#sketch/distinctcount/statistics arg
# the field index of the record that will be extracted
hibench.streamingbench.field_index 1

# Number of KafkaProducer running on different thread (default: 1)
# The limitation of a single KafkaProducer is about 100Mb/s
hibench.streambench.datagen.producerNumber 1
#sketch/wordcount/distinctcount/statistics arg
# the seperator between fields of a single record
hibench.streamingbench.separator \\s+

hibench.streambench.fixWindowDuration 30000
#sample arg
# probability that a record will be taken as a sample
hibench.streamingbench.prob 0.1

hibench.streambench.fixWindowSlideStep 30000
#########################################################
# Spark Streaming Config
#########################################################
#grep arg
# the substring that will be checked to see if contained in a record
hibench.streamingbench.pattern the

# Number of nodes that will receive kafka input (default: 4)
hibench.streambench.spark.receiverNumber 4
#common arg
# indicate RDD storage level.
# 1 for memory only 1 copy. Others for default mem_disk_ser 2 copies
hibench.streamingbench.copies 2

# Spark streaming Batchnterval in millisecond (default 100)
hibench.streambench.spark.batchInterval 100
# indicate whether to test the write ahead log new feature
# set true to test WAL feature
hibench.streamingbench.testWAL false

# Indicate RDD storage level. (default: 2)
# 0 = StorageLevel.MEMORY_ONLY
# 1 = StorageLevel.MEMORY_AND_DISK_SER
# other = StorageLevel.MEMORY_AND_DISK_SER_2
hibench.streambench.spark.storageLevel 2
# if testWAL is true, this path to store stream context in hdfs shall be specified. If false, it can be empty
hibench.streamingbench.checkpoint_path

# indicate whether to test the write ahead log new feature (default: false)
hibench.streambench.spark.enableWAL false
#common arg
# indicate whether in debug mode for correctness verfication
hibench.streamingbench.debug false

# if testWAL is true, this path to store stream context in hdfs shall be specified. If false, it can be empty (default: /var/tmp)
hibench.streambench.spark.checkpointPath /var/tmp
# whether to use direct approach or not ( sparkstreaming only )
hibench.streamingbench.direct_mode true

# whether to use direct approach or not (dafault: true)
hibench.streambench.spark.useDirectMode true
# Kafka broker lists, used for direct mode, written in mode "host:port,host:port,..."

#########################################################
# Flink Config
#########################################################
hibench.streambench.flink.home /PATH/TO/FLINK/HOME
# example: hostname:9092
hibench.streamingbench.brokerList HOSTNAME:HOSTPORT

# default parallelism of flink job
hibench.streambench.flink.parallelism 20
hibench.streamingbench.broker_list_with_quote "${hibench.streamingbench.brokerList}"

hibench.streambench.flink.bufferTimeout 5
# storm bench conf

hibench.streambench.flink.checkpointDuration 1000
# STORM_BIN_HOME
hibench.streamingbench.storm.home /PATH/TO/STORM/HOME

#########################################################
# Storm Config
#########################################################
# Kafka home
hibench.streamingbench.kafka.home /PATH/TO/KAFKA/HOME

# STORM_BIN_HOME
hibench.streambench.storm.home /PATH/TO/STORM/HOME

#Cluster config
# nimbus of storm cluster
hibench.streambench.storm.nimbus HOSTNAME_OF_STORM_NIMBUS
hibench.streambench.storm.nimbusAPIPort 6627
hibench.streamingbench.storm.nimbus HOSTNAME_OF_STORM
hibench.streamingbench.storm.nimbusAPIPort 6627

# time interval to contact nimbus to judge if finished
hibench.streambench.storm.nimbusContactInterval 10
hibench.streamingbench.storm.nimbusContactInterval 10


#Parallel config

# number of workers of Storm. Number of most bolt threads is also equal to this param.
hibench.streambench.storm.worker_count 12
hibench.streamingbench.storm.worker_count 12

# number of kafka spout threads of Storm
hibench.streambench.storm.spout_threads 12
hibench.streamingbench.storm.spout_threads 12

# number of bolt threads altogether
hibench.streambench.storm.bolt_threads 12
hibench.streamingbench.storm.bolt_threads 12

# kafka arg indicating whether to read data from kafka from the start or go on to read from last position
hibench.streambench.storm.read_from_start true
hibench.streamingbench.storm.read_from_start true

# whether to turn on ack
hibench.streambench.storm.ackon true

#########################################################
# Gearpump Config
#########################################################

hibench.streambench.gearpump.home /PATH/TO/GEARPUMP/HOME

hibench.streambench.gearpump.executors 1

hibench.streambench.gearpump.parallelism 1
hibench.streamingbench.storm.ackon true

# Added for default rules:
hibench.streamingbench.jars ${hibench.streamingbench.sparkbench.jar}
hibench.streamingbench.sparkbench.jar ${hibench.home}/src/streambench/sparkbench/target/streaming-bench-spark_0.1-5.0-SNAPSHOT-${hibench.spark.version}-jar-with-dependencies.jar
hibench.streamingbench.stormbench.jar ${hibench.home}/src/streambench/stormbench/target/streaming-bench-storm-0.1-SNAPSHOT-jar-with-dependencies.jar
hibench.streamingbench.datagen.jar ${hibench.home}/src/streambench/datagen/target/datagen-0.0.1-jar-with-dependencies.jar
hibench.streamingbench.storm.bin ${hibench.streamingbench.storm.home}/bin
hibench.streamingbench.zkhelper.jar ${hibench.home}/src/streambench/zkHelper/target/streaming-bench-zkhelper-0.1-SNAPSHOT-jar-with-dependencies.jar

# default path setting for store of data1 & data2
hibench.streamingbench.datagen.dir ${hibench.hdfs.data.dir}/Streaming

# partition size settings
hibench.streamingbench.partitions 1
Loading

0 comments on commit 1f49a4f

Please sign in to comment.