Skip to content

Commit

Permalink
[WIP][CELEBORN-1540] Support tez (apache#2709)
Browse files Browse the repository at this point in the history
Co-authored-by: hongguangwei <[email protected]>
Co-authored-by: dujunling <[email protected]>
Co-authored-by: mingji <[email protected]>
  • Loading branch information
4 people authored Sep 5, 2024
1 parent ce0fee6 commit 8f945f4
Show file tree
Hide file tree
Showing 45 changed files with 9,592 additions and 56 deletions.
30 changes: 30 additions & 0 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,33 @@ jobs:
name: mr-unit-test-log
path: |
**/target/test/
tez:
runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
java:
- 8
steps:
- uses: actions/checkout@v2
- name: Setup JDK ${{ matrix.java }}
uses: actions/setup-java@v2
with:
distribution: zulu
java-version: ${{ matrix.java }}
cache: maven
check-latest: false
- name: Test with Maven
run: |
PROFILES="-Pgoogle-mirror,tez"
TEST_MODULES="client-tez/tez,client-tez/tez-shaded,tests/tez-it"
build/mvn $PROFILES -pl $TEST_MODULES -am clean install -DskipTests
build/mvn $PROFILES -pl $TEST_MODULES test
- name: Upload test log
if: failure()
uses: actions/upload-artifact@v3
with:
name: tez-unit-test-log
path: |
**/target/test/
65 changes: 65 additions & 0 deletions assets/tez-patch/for_local_test.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
Subject: [PATCH] for local test
---
Index: tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java (revision 72977b8720b2337ab0a0a3bf3b12e1c57900fa69)
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java (date 1723795814592)
@@ -48,12 +48,10 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.AsyncDispatcher;
+import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
-import org.apache.tez.dag.api.DAGSubmissionTimedOut;
-import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.*;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGClientHandler;
import org.apache.tez.dag.api.client.DAGClientImpl;
@@ -87,6 +85,7 @@
private TezApiVersionInfo versionInfo = new TezApiVersionInfo();
private volatile Throwable amFailException = null;
private boolean isLocalWithoutNetwork;
+ private String appMasterName;

private static final String localModeDAGSchedulerClassName =
"org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled";
@@ -109,6 +108,9 @@
this.isLocalWithoutNetwork =
tezConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE_WITHOUT_NETWORK,
TezConfiguration.TEZ_LOCAL_MODE_WITHOUT_NETWORK_DEFAULT);
+
+ this.appMasterName =
+ tezConf.get(TezConfiguration.TEZ_PREFIX + "appmaster.class.name");
}


@@ -393,6 +395,21 @@
TezUtilsInternal.readUserSpecifiedTezConfiguration(userDir)
.getAmPluginDescriptor();

+ if (appMasterName != null) {
+ Class<?>[] classes = new Class[] {ApplicationAttemptId.class, ContainerId.class,
+ String.class, int.class, int.class, Clock.class, long.class, boolean.class,
+ String.class, String[].class, String[].class, String.class, Credentials.class,
+ String.class, AMPluginDescriptorProto.class};
+ Object[] args = new Object[] {applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
+ SystemClock.getInstance(), appSubmitTime, isSession, userDir, localDirs, logDirs,
+ versionInfo.getVersion(), credentials, jobUserName, amPluginDescriptorProto};
+ try {
+ return ReflectionUtils.createClazzInstance(appMasterName, classes, args);
+ } catch (TezReflectionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
return isLocalWithoutNetwork
? new LocalDAGAppMaster(applicationAttemptId, cId, currentHost, nmPort, nmHttpPort,
SystemClock.getInstance(), appSubmitTime, isSession, userDir, localDirs, logDirs,
39 changes: 31 additions & 8 deletions client-tez/tez-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,25 @@

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/resources/assembly.xml</descriptor>
</descriptors>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<goals>
<goal>single</goal>
</goals>
<phase>package</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
Expand All @@ -55,22 +74,26 @@
<pattern>io.netty</pattern>
<shadedPattern>${shading.prefix}.io.netty</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons</pattern>
<shadedPattern>${shading.prefix}.org.apache.commons</shadedPattern>
</relocation>
<relocation>
<pattern>org.scala-lang</pattern>
<shadedPattern>${shading.prefix}.org.scala-lang</shadedPattern>
</relocation>
<relocation>
<pattern>org.lz4</pattern>
<shadedPattern>${shading.prefix}.org.lz4</shadedPattern>
</relocation>
<relocation>
<pattern>org.roaringbitmap</pattern>
<shadedPattern>${shading.prefix}.org.roaringbitmap</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons.lang3</pattern>
<shadedPattern>${shading.prefix}.org.apache.commons.lang3</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads</pattern>
<shadedPattern>${shading.prefix}.org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.tez.common.TezCommonUtils</pattern>
<shadedPattern>${shading.prefix}.org.apache.tez.common.TezCommonUtils</shadedPattern>
</relocation>
</relocations>
<artifactSet>
<includes>
Expand Down
46 changes: 46 additions & 0 deletions client-tez/tez-shaded/src/main/resources/assembly.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@

<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0
http://maven.apache.org/xsd/assembly-2.0.0.xsd http://maven.apache.org/ASSEMBLY/2.0.0 ">
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<!-- see https://maven.apache.org/plugins/maven-assembly-plugin/assembly -->

<id>fat</id>

<formats>
<format>jar</format>
</formats>

<includeBaseDirectory>false</includeBaseDirectory>

<dependencySets>
<dependencySet>
<useProjectArtifact>true</useProjectArtifact>
<unpack>true</unpack>
<scope>runtime</scope>
<includes>
<include>org.apache.celeborn:*</include>
<include>com.google.protobuf:*</include>
<include>org.apache.tez:tez-runtime-library</include>
<include>org.apache.tez:tez-api</include>
</includes>
</dependencySet>
</dependencySets>
</assembly>
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.client;

import java.io.IOException;
import java.util.concurrent.atomic.LongAdder;

import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.celeborn.client.write.DataPusher;
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.identity.UserIdentifier;

public class CelebornTezWriter {
private final Logger logger = LoggerFactory.getLogger(CelebornTezWriter.class);

private final ShuffleClient shuffleClient;
private DataPusher dataPusher;
private final int shuffleId;
private final int mapId;
private final int attemptNumber;
private final int numMappers;
private final int numPartitions;

public CelebornTezWriter(
int shuffleId,
int mapId,
int attemptNumber,
long taskAttemptId,
int numMappers,
int numPartitions,
CelebornConf conf,
String appUniqueId,
String lifecycleManagerHost,
int lifecycleManagerPort,
UserIdentifier userIdentifier) {
shuffleClient =
ShuffleClient.get(
appUniqueId, lifecycleManagerHost, lifecycleManagerPort, conf, userIdentifier, null);
// TEZ_SHUFFLE_ID
this.shuffleId = shuffleId;
this.mapId = mapId;
this.attemptNumber = attemptNumber;
this.numMappers = numMappers;
this.numPartitions = numPartitions;

LongAdder[] mapStatusLengths = new LongAdder[numPartitions];
for (int i = 0; i < numPartitions; i++) {
mapStatusLengths[i] = new LongAdder();
}
try {
dataPusher =
new DataPusher(
shuffleId,
mapId,
attemptNumber,
taskAttemptId,
numMappers,
numPartitions,
conf,
shuffleClient,
null,
integer -> {},
mapStatusLengths);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

public void pushData(int partitionId, byte[] dataBuf, int size) throws IOException {
try {
dataPusher.addTask(partitionId, dataBuf, size);
} catch (InterruptedException e) {
throw new IOInterruptedException(e);
}
}

public void mergeData(int partitionId, byte[] dataBuf, int size) throws IOException {
int bytesWritten =
shuffleClient.mergeData(
shuffleId,
mapId,
attemptNumber,
partitionId,
dataBuf,
0,
size,
numMappers,
numPartitions);
}

public void close() throws IOException {
logger.info(
"Call mapper end shuffleId:{} mapId:{} attemptId:{} numMappers:{}",
0,
mapId,
attemptNumber,
numMappers);
try {
dataPusher.waitOnTermination();
shuffleClient.pushMergedData(shuffleId, mapId, attemptNumber);
shuffleClient.mapperEnd(shuffleId, mapId, attemptNumber, numMappers);
} catch (InterruptedException e) {
throw new IOInterruptedException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@
import java.lang.reflect.Field;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;

import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.exception.CelebornIOException;
import org.apache.celeborn.common.exception.CelebornRuntimeException;

public class TezUtils {
public class CelebornTezUtils {
public static final String TEZ_PREFIX = "tez.";
public static final String TEZ_CELEBORN_LM_HOST = "celeborn.lifecycleManager.host";
public static final String TEZ_CELEBORN_LM_PORT = "celeborn.lifecycleManager.port";
public static final String TEZ_CELEBORN_USER = "celeborn.lifecycleManager.user";
public static final String TEZ_CELEBORN_APPLICATION_ID = "celeborn.applicationId";
public static final String TEZ_SHUFFLE_ID = "celeborn.tez.shuffle.id";
public static final String TEZ_BROADCAST_OR_ONETOONE = "celeborn.tez.broadcastOrOneToOne";

public static final CelebornConf fromTezConfiguration(Configuration tezConfig) {
CelebornConf tmpCelebornConf = new CelebornConf();
Expand All @@ -56,6 +59,14 @@ public static Object getPrivateField(Object object, String name) {
}
}

public static String uniqueIdentifierToAttemptId(String uniqueIdentifier) {
if (uniqueIdentifier == null) {
throw new CelebornRuntimeException("uniqueIdentifier should not be null");
}
String[] ids = uniqueIdentifier.split("_");
return StringUtils.join(ids, "_", 0, 7);
}

public static String ensureGetSysEnv(String envName) throws IOException {
String value = System.getenv(envName);
if (value == null) {
Expand Down
Loading

0 comments on commit 8f945f4

Please sign in to comment.