Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TEZ fixes #2

Open
wants to merge 6 commits into
base: addons
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions app-conf/FetcherConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@
</fetcher>
-->
<fetchers>
<!--

<fetcher>
<applicationtype>mapreduce</applicationtype>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comments to explain what u have done..

<classname>com.linkedin.drelephant.mapreduce.fetchers.MapReduceFetcherHadoop2</classname>
<params>
<sampling_enabled>false</sampling_enabled>
</params>
</fetcher>
-->

<!--
This is a replacement for the MapReduceFetcherHadoop2 that attempts to burn
through queues of jobs faster by pulling data directly from HDFS rather than going through
Expand All @@ -51,7 +51,7 @@
To work properly, this fetcher should use the same timezone with the job history server.
If not set, the local timezone will be used.
-->

<!--
<fetcher>
<applicationtype>mapreduce</applicationtype>
<classname>com.linkedin.drelephant.mapreduce.fetchers.MapReduceFSFetcherHadoop2</classname>
Expand All @@ -62,7 +62,7 @@
</params>
</fetcher>


-->
<!--
FSFetcher for Spark. Loads the eventlog from HDFS and replays to get the metrics and application properties

Expand Down
2 changes: 1 addition & 1 deletion app-conf/GeneralConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
</property>
<property>
<name>drelephant.executor.service.class.name</name>
<value>com.linkedin.drelephant.executors.QuartzExecutorService</value>
<value>com.linkedin.drelephant.executors.ThreadPoolExecutorService</value>
<description>Executor service (can be one of ThreadPoolExecutorService or QuartzExecutorService)</description>
</property>
<!--
Expand Down
43 changes: 42 additions & 1 deletion app/com/linkedin/drelephant/analysis/AnalyticJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import models.AppHeuristicResult;
import models.AppHeuristicResultDetails;
import models.AppResult;
import models.FailedAppResult;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.log4j.Logger;


Expand Down Expand Up @@ -249,6 +251,40 @@ public AnalyticJob setTrackingUrl(String trackingUrl) {
return this;
}


public FailedAppResult getFailedAppResult(Exception e) throws Exception {

ElephantFetcher fetcher = ElephantContext.instance().getFetcherForApplicationType(getAppType());
HadoopApplicationData data = fetcher.fetchConfData(this);

JobType jobType = ElephantContext.instance().matchJobType(data);
String jobTypeName = jobType == null ? UNKNOWN_JOB_TYPE : jobType.getName();

AppResult result = new AppResult();
InfoExtractor.loadInfo(result, data);

FailedAppResult failedApp = new FailedAppResult();
failedApp.appId = Utils.truncateField(getAppId(), AppResult.ID_LIMIT, getAppId());
failedApp.startTime = getStartTime();
failedApp.finishTime = getFinishTime();
failedApp.name = Utils.truncateField(getName(), AppResult.APP_NAME_LIMIT, getAppId());
failedApp.trackingUrl = Utils.truncateField(getTrackingUrl(), AppResult.TRACKING_URL_LIMIT, getAppId());
failedApp.jobType = Utils.truncateField(jobTypeName, AppResult.JOBTYPE_LIMIT, getAppId());
failedApp.scheduler = result.scheduler;
failedApp.jobName =result.jobName;
failedApp.jobDefId = result.jobDefId;
failedApp.jobExecId = result.jobExecId;
failedApp.flowDefId = result.flowDefId;
failedApp.flowExecId = result.flowExecId;
failedApp.jobDefUrl = result.jobDefUrl;
failedApp.jobExecUrl = result.jobExecUrl;
failedApp.flowDefId =result.flowDefId;
failedApp.flowExecUrl = result.jobExecUrl;
failedApp.error = ExceptionUtils.getStackTrace(e);

return failedApp;
}

/**
* Returns the analysed AppResult that could be directly serialized into DB.
*
Expand Down Expand Up @@ -360,6 +396,11 @@ public boolean isSecondPhaseRetry(){
* @return true if should retry, else false
*/
public boolean isPrimaryPhaseRetry() {
return (_retries++) < _RETRY_LIMIT;
if(_retries < _RETRY_LIMIT) {
_retries++;
return true;
}
// Not incrementing _retries if condition fails so as to get correct value of total retries in case of QuartzScheduling.
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.*;
import models.AppResult;
import models.CheckPoint;
import models.FailedAppResult;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
Expand Down Expand Up @@ -244,6 +245,14 @@ public void analyseJob(AnalyticJob analyticJob) {
} else {
if (analyticJob != null) {
MetricsController.triggerJobRetriesExhaustionEvent();
try {
FailedAppResult result = analyticJob.getFailedAppResult(e);
result.save();

} catch (Exception ex) {
logger.error("Failed to get info for failed app: " + analyticJob.getAppId());
logger.error(ExceptionUtils.getStackTrace(ex));
}
logger.error("Drop the analytic job. Reason: reached the max retries for application id = ["
+ analyticJob.getAppId() + "].");
}
Expand Down
3 changes: 3 additions & 0 deletions app/com/linkedin/drelephant/analysis/ElephantFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ public interface ElephantFetcher<T extends HadoopApplicationData> {
*/
public T fetchData(AnalyticJob job)
throws Exception;

public T fetchConfData(AnalyticJob job);

}
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ public void execute(AnalyticJob analyticJob) {
logger.error("job already exist with app_id: "+ analyticJob.getAppId());
} catch (SchedulerException e) {
throw new RuntimeException("Error while setting up scheduler : ", e);
} finally {
interval = 0;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,11 @@ private DataFiles getHistoryFiles(AnalyticJob job) throws IOException {
return new DataFiles(jobConfPath, jobHistPath);
}

@Override
public MapReduceApplicationData fetchConfData(AnalyticJob analyticJob) {
return new MapReduceApplicationData();
}

@Override
public MapReduceApplicationData fetchData(AnalyticJob job) throws IOException {
DataFiles files = getHistoryFiles(job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,32 @@ public MapReduceFetcherHadoop2(FetcherConfigurationData fetcherConfData) throws
}

@Override
public MapReduceApplicationData fetchData(AnalyticJob analyticJob) throws IOException, AuthenticationException {
public MapReduceApplicationData fetchConfData(AnalyticJob analyticJob) {
String appId = analyticJob.getAppId();
MapReduceApplicationData jobData = new MapReduceApplicationData();
String jobId = Utils.getJobIdFromApplicationId(appId);
jobData.setAppId(appId).setJobId(jobId);
// Change job tracking url to job history page
analyticJob.setTrackingUrl(_jhistoryWebAddr + jobId);
try {

// Fetch job config
try {
Properties jobConf = _jsonFactory.getProperties(_urlFactory.getJobConfigURL(jobId));
jobData.setJobConf(jobConf);
} catch (Exception e) {
logger.error("Failed to fetch conf data: ", e);
}
return jobData;
}

@Override
public MapReduceApplicationData fetchData(AnalyticJob analyticJob) throws IOException, AuthenticationException {

String appId = analyticJob.getAppId();
String jobId = Utils.getJobIdFromApplicationId(appId);
MapReduceApplicationData jobData = fetchConfData(analyticJob);

try {
URL jobURL = _urlFactory.getJobURL(jobId);
String state = _jsonFactory.getState(jobURL);

Expand Down
2 changes: 2 additions & 0 deletions app/com/linkedin/drelephant/spark/fetchers/FSFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class FSFetcher(fetcherConfigurationData: FetcherConfigurationData)
extends ElephantFetcher[SparkApplicationData] {
lazy val legacyFetcher = new SparkFSFetcher(fetcherConfigurationData)

override def fetchConfData(job: AnalyticJob): SparkApplicationData = ???

override def fetchData(analyticJob: AnalyticJob): SparkApplicationData = {
val legacyData = legacyFetcher.fetchData(analyticJob)
LegacyDataConverters.convert(legacyData)
Expand Down
2 changes: 2 additions & 0 deletions app/com/linkedin/drelephant/spark/fetchers/SparkFetcher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class SparkFetcher(fetcherConfigurationData: FetcherConfigurationData)
new SparkLogClient(hadoopConfiguration, sparkConf, eventLogUri)
}

override def fetchConfData(job: AnalyticJob): SparkApplicationData = ???

override def fetchData(analyticJob: AnalyticJob): SparkApplicationData = {
doFetchData(analyticJob) match {
case Success(data) => data
Expand Down
Loading