Skip to content

Commit

Permalink
Merge branch '6.0withMaster'
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonwang committed Nov 8, 2016
2 parents 1f49a4f + 6b02cac commit cb633ac
Show file tree
Hide file tree
Showing 560 changed files with 108,095 additions and 52,112 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,5 @@ report/
.classpath
.project
.settings/
conf/hadoop.conf
conf/spark.conf
96 changes: 0 additions & 96 deletions Base_SCSReportforHiBench.html

This file was deleted.

Binary file removed WISS10_conf_full_011.pdf
Binary file not shown.
83 changes: 83 additions & 0 deletions autogen/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.intel.hibench</groupId>
<artifactId>hibench</artifactId>
<version>6.0-SNAPSHOT</version>
</parent>

<artifactId>autogen</artifactId>
<packaging>jar</packaging>
<groupId>com.intel.hibench</groupId>
<name>HiBench data generation tools</name>

<dependencies>
<dependency>
<groupId>com.intel.hibench</groupId>
<artifactId>hibench-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-core</artifactId>
<version>${mahout.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-math</artifactId>
<version>${mahout.version}</version>
</dependency>
<dependency>
<groupId>org.uncommons.maths</groupId>
<artifactId>uncommons-maths</artifactId>
<version>${uncommons-maths.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.mr2.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.mr2.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>${maven-assembly-plugin.version}</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -331,11 +331,11 @@ private void createRankingsTableDirectly() throws IOException, URISyntaxExceptio
job.setInputFormat(NLineInputFormat.class);
FileInputFormat.setInputPaths(job, dummy.getPath());

job.set("mapred.map.output.compression.type", "BLOCK");
job.set("mapreduce.output.fileoutputformat.compress.type","BLOCK");
MapFileOutputFormat.setCompressOutput(job, true);
// job.set("mapred.map.output.compression.type", "BLOCK");
// job.set("mapreduce.output.fileoutputformat.compress.type","BLOCK");
// MapFileOutputFormat.setCompressOutput(job, true);
// MapFileOutputFormat.setOutputCompressorClass(job, org.apache.hadoop.io.compress.LzoCodec.class);
MapFileOutputFormat.setOutputCompressorClass(job, org.apache.hadoop.io.compress.DefaultCodec.class);
// MapFileOutputFormat.setOutputCompressorClass(job, org.apache.hadoop.io.compress.DefaultCodec.class);

if (options.isSequenceOut()) {
job.setOutputFormat(SequenceFileOutputFormat.class);
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.Locale;
import java.util.Random;
Expand All @@ -17,7 +19,7 @@ public class RawData {

private static Random rand = new Random(11);

private static String dict = "/usr/share/dict/words";
private static String dict = "/words";
private static int numSourceWords = 1000;
private static int numSourceUAgents = 2000;

Expand Down Expand Up @@ -116,18 +118,16 @@ public static void createSearchKeys(Path hdfs_searchkeys) throws IOException {

FileSystem fs = hdfs_searchkeys.getFileSystem(new Configuration());
FSDataOutputStream fout = fs.create(hdfs_searchkeys);

File fdict = new File(dict);
InputStream is=new RawData().getClass().getResourceAsStream(dict);
int len = 0;
if (fdict.exists()) {

FileReader fr = new FileReader(fdict);
BufferedReader br = new BufferedReader(fr);
if (is!=null) {
InputStreamReader isr=new InputStreamReader(is);
BufferedReader br = new BufferedReader(isr);
while (null != br.readLine()) {
len++;
}
br.close();
fr.close();

int[] wids = new int[numSourceWords];
for (int i=0; i<numSourceWords; i++) {
Expand All @@ -136,9 +136,9 @@ public static void createSearchKeys(Path hdfs_searchkeys) throws IOException {
Arrays.sort(wids);

int i=0, j=0;
File newfdict = new File(dict);
FileReader newfr = new FileReader(newfdict);
BufferedReader newbr = new BufferedReader(newfr);
InputStream newis=new RawData().getClass().getResourceAsStream(dict);
InputStreamReader newisr=new InputStreamReader(newis);
BufferedReader newbr = new BufferedReader(newisr);
while ((i<wids.length) && (j<len)) {
String wd = newbr.readLine();
if (j==wids[i]) {
Expand All @@ -151,7 +151,6 @@ public static void createSearchKeys(Path hdfs_searchkeys) throws IOException {
j++;
}
newbr.close();
newfr.close();
} else {
for (int i=0; i<numSourceWords; i++) {
String wd = nextSeedWord() + "\n";
Expand Down Expand Up @@ -193,12 +192,12 @@ public static int putDictToHdfs(Path hdfs_dict, int size) throws IOException {
FileSystem fs = hdfs_dict.getFileSystem(new Configuration());
FSDataOutputStream fout = fs.create(hdfs_dict);

File fdict = new File(dict);
InputStream is=new RawData().getClass().getResourceAsStream(dict);
int len = 0;
if (fdict.exists()) {
if (is!=null) {

FileReader fr = new FileReader(fdict);
BufferedReader br = new BufferedReader(fr);
InputStreamReader isr=new InputStreamReader(is);
BufferedReader br = new BufferedReader(isr);

while (len < size) {
String word = br.readLine() + "\n";
Expand All @@ -208,9 +207,8 @@ public static int putDictToHdfs(Path hdfs_dict, int size) throws IOException {
len++;
}
br.close();
fr.close();
}
fout.close();
return len;
}
}
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.hibench.datagen.streaming;

import com.intel.hibench.common.HiBenchConfig;
import com.intel.hibench.common.streaming.ConfigLoader;
import com.intel.hibench.common.streaming.StreamBenchConfig;
import com.intel.hibench.datagen.streaming.util.DataGeneratorConfig;
import com.intel.hibench.datagen.streaming.util.KafkaSender;
import com.intel.hibench.datagen.streaming.util.RecordSendTask;

import java.util.Timer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class DataGenerator {

public static void main(String[] args) {
if (args.length < 5) {
System.err.println("args: <ConfigFile> <userVisitsFile> <userVisitsFileOffset> <kMeansFile> <kMeansFileOffset> need to be specified!");
System.exit(1);
}

// initialize variable from configuration and input parameters.
ConfigLoader configLoader = new ConfigLoader(args[0]);

String userVisitsFile = args[1];
long userVisitsFileOffset = Long.parseLong(args[2]);
String kMeansFile = args[3];
long kMeansFileOffset = Long.parseLong(args[4]);

// load properties from config file
String testCase = configLoader.getProperty(StreamBenchConfig.TESTCASE).toLowerCase();
String topic = configLoader.getProperty(StreamBenchConfig.KAFKA_TOPIC);
String brokerList = configLoader.getProperty(StreamBenchConfig.KAFKA_BROKER_LIST);
int intervalSpan = Integer.parseInt(configLoader.getProperty(StreamBenchConfig.DATAGEN_INTERVAL_SPAN));
long recordsPerInterval = Long.parseLong(configLoader.getProperty(StreamBenchConfig.DATAGEN_RECORDS_PRE_INTERVAL));
long totalRecords = Long.parseLong(configLoader.getProperty(StreamBenchConfig.DATAGEN_TOTAL_RECORDS));
int totalRounds = Integer.parseInt(configLoader.getProperty(StreamBenchConfig.DATAGEN_TOTAL_ROUNDS));
int recordLength = Integer.parseInt(configLoader.getProperty(StreamBenchConfig.DATAGEN_RECORD_LENGTH));
String dfsMaster = configLoader.getProperty(HiBenchConfig.DFS_MASTER);
boolean debugMode = Boolean.getBoolean(configLoader.getProperty(StreamBenchConfig.DEBUG_MODE));

DataGeneratorConfig dataGeneratorConf = new DataGeneratorConfig(testCase, brokerList, kMeansFile, kMeansFileOffset,
userVisitsFile, userVisitsFileOffset, dfsMaster, recordLength, intervalSpan, topic, recordsPerInterval,
totalRounds, totalRecords, debugMode);

// Create thread pool and submit producer task
int producerNumber = Integer.parseInt(configLoader.getProperty(StreamBenchConfig.DATAGEN_PRODUCER_NUMBER));
ExecutorService pool = Executors.newFixedThreadPool(producerNumber);
for(int i = 0; i < producerNumber; i++) {
pool.execute(new DataGeneratorJob(dataGeneratorConf));
}

// Print out some basic information
System.out.println("============ StreamBench Data Generator ============");
System.out.println(" Interval Span : " + intervalSpan + " ms");
System.out.println(" Record Per Interval : " + recordsPerInterval);
System.out.println(" Record Length : " + recordLength + " bytes");
System.out.println(" Producer Number : " + producerNumber);
if(totalRecords == -1) {
System.out.println(" Total Records : -1 [Infinity]");
} else {
System.out.println(" Total Records : " + totalRecords);
}

if (totalRounds == -1) {
System.out.println(" Total Rounds : -1 [Infinity]");
} else {
System.out.println(" Total Rounds : " + totalRounds);
}
System.out.println(" Kafka Topic : " + topic);
System.out.println("====================================================");
System.out.println("Estimated Speed : ");
long recordsPreSecond = recordsPerInterval * 1000 * producerNumber / intervalSpan ;
System.out.println(" " + recordsPreSecond + " records/second");
double mbPreSecond = (double)recordsPreSecond * recordLength / 1000000;
System.out.println(" " + mbPreSecond + " Mb/second");
System.out.println("====================================================");

pool.shutdown();
}

static class DataGeneratorJob implements Runnable {
DataGeneratorConfig conf;

// Constructor
public DataGeneratorJob(DataGeneratorConfig conf) {
this.conf = conf;
}

@Override
public void run() {
// instantiate KafkaSender
KafkaSender sender;
if(conf.getTestCase().contains("statistics")) {
sender = new KafkaSender(conf.getBrokerList(), conf.getkMeansFile(), conf.getkMeansFileOffset(),
conf.getDfsMaster(), conf.getRecordLength(), conf.getIntervalSpan());
} else {
sender = new KafkaSender(conf.getBrokerList(), conf.getUserVisitsFile(), conf.getUserVisitsFileOffset(),
conf.getDfsMaster(), conf.getRecordLength(), conf.getIntervalSpan());
}

// Schedule timer task
Timer timer = new Timer();
timer.scheduleAtFixedRate(
new RecordSendTask(sender, conf.getTopic(), conf.getRecordsPerInterval(),
conf.getTotalRounds(), conf.getTotalRecords(), conf.getDebugMode(), timer), 0, conf.getIntervalSpan());
}
}
}
Loading

0 comments on commit cb633ac

Please sign in to comment.