Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-397] GraphAI Nearline Inference QuickStart #433

Merged
merged 11 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added data/InferUDF.zip
Binary file not shown.
385 changes: 385 additions & 0 deletions docs/docs-cn/source/3.quick_start/3.quick_start_infer&UDF.md

Large diffs are not rendered by default.

394 changes: 394 additions & 0 deletions docs/docs-en/source/3.quick_start/3.quick_start_infer&UDF.md

Large diffs are not rendered by default.

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,15 @@

package com.antgroup.geaflow.infer;

import static com.antgroup.geaflow.infer.util.InferFileUtils.MODEL_FILE_EXTENSION;
import static com.antgroup.geaflow.infer.util.InferFileUtils.MODEL_NAME;
import static com.antgroup.geaflow.infer.util.InferFileUtils.PY_FILE_EXTENSION;
import static com.antgroup.geaflow.infer.util.InferFileUtils.REQUIREMENTS_TXT;
import static com.antgroup.geaflow.infer.util.InferFileUtils.getPythonFilesByCondition;

import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.infer.util.InferFileUtils;
import com.google.common.base.Preconditions;
import java.io.File;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.io.FilenameUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -92,36 +86,4 @@ private List<String> buildInferRuntimeFiles() {
}
return runtimeFiles;
}

private void prepareInferFiles(InferEnvironmentContext environmentContext) {
String pythonFilesDirectory = environmentContext.getInferFilesDirectory();
List<File> modelFile = getPythonFilesByCondition(pathname -> {
String fileName = pathname.getName();
String fileExtension = FilenameUtils.getExtension(fileName);
return pathname.isFile() && MODEL_FILE_EXTENSION.equals(fileExtension);
});

Preconditions.checkState(!modelFile.isEmpty(), "model(.pt) file is not exist, please upload model.pt file");
Preconditions.checkState(modelFile.size() == 1, "upload model.pt num more than 1");
InferFileUtils.copyPythonFile(pythonFilesDirectory, modelFile.get(0), MODEL_NAME);

List<File> requirementsFile = getPythonFilesByCondition(pathname -> {
String fileName = pathname.getName();
return pathname.isFile() && fileName.equals(REQUIREMENTS_TXT);
});

Preconditions.checkState(!requirementsFile.isEmpty(), "please upload requirements.txt "
+ "(build infer env) file");
Preconditions.checkState(requirementsFile.size() == 1, "upload requirements.txt num more than 1");
InferFileUtils.copyPythonFile(environmentContext.getVirtualEnvDirectory(), requirementsFile.get(0));

List<File> pythonFiles = getPythonFilesByCondition(pathname -> {
String fileName = pathname.getName();
String fileExtension = FilenameUtils.getExtension(fileName);
return pathname.isFile() && PY_FILE_EXTENSION.equals(fileExtension);
});
Preconditions.checkState(!pythonFiles.isEmpty(), "infer files is empty, please upload "
+ "infer files");
pythonFiles.forEach(f -> InferFileUtils.copyPythonFile(pythonFilesDirectory, f));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.antgroup.geaflow.infer.util.InferFileUtils;
import com.antgroup.geaflow.infer.util.ShellExecUtils;
import com.google.common.base.Joiner;

import java.io.File;
import java.nio.channels.FileLock;
import java.time.Duration;
Expand All @@ -33,6 +34,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,7 +44,15 @@ public class InferEnvironmentManager implements AutoCloseable {

private static final String LOCK_FILE = "_lock";

private static final String SHELL_START = "/bin/sh";
private static final String SHELL_START = "/bin/bash";

private static final long TIMEOUT_SECOND = 10;

private static final String SCRIPT_SEPARATOR = " ";

private static final String CHMOD_CMD = "chmod";

private static final String CHMOD_PERMISSION = "755";

private static final String FINISH_FILE = "_finish";

Expand Down Expand Up @@ -169,7 +179,18 @@ private boolean createInferVirtualEnv(InferDependencyManager dependencyManager,
shellCommand.addAll(execParams);
String cmd = Joiner.on(" ").join(shellCommand);
LOGGER.info("create infer virtual env {}", cmd);

// Run "chmod 755 $shellPath"
List<String> runCommands = new ArrayList<>();
runCommands.add(CHMOD_CMD);
runCommands.add(CHMOD_PERMISSION);
runCommands.add(shellPath);
String chmodCmd = Joiner.on(SCRIPT_SEPARATOR).join(runCommands);
LOGGER.info("change {} permission run command is {}", shellPath, chmodCmd);
int installEnvTimeOut = configuration.getInteger(FrameworkConfigKeys.INFER_ENV_INIT_TIMEOUT_SEC);
if (!ShellExecUtils.run(chmodCmd, Duration.ofSeconds(installEnvTimeOut), LOGGER::info, LOGGER::error)) {
return false;
}
return ShellExecUtils.run(cmd, Duration.ofSeconds(installEnvTimeOut), LOGGER::info, LOGGER::error, workingDir);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,6 @@ public class InferFileUtils {

private static final int DEFAULT_BUFFER_SIZE = 1024;

public static final String MODEL_FILE_EXTENSION = ".pt";

public static final String MODEL_NAME = "model.pt";

public static final String REQUIREMENTS_TXT = "requirements.txt";

public static void releaseLock(FileLock fileLock) {
Expand Down Expand Up @@ -239,32 +235,21 @@ public static List<Path> getPathsFromResourceJAR(String folder) throws URISyntax
public static void prepareInferFilesFromJars(String targetDirectory) {
File userJobJarFile = getUserJobJarFile();
Preconditions.checkNotNull(userJobJarFile);
int modelFileNum = 0;
int requirementsFileNum = 0;
try {
JarFile jarFile = new JarFile(userJobJarFile);
Enumeration<JarEntry> entries = jarFile.entries();
while (entries.hasMoreElements()) {
JarEntry entry = entries.nextElement();
String entryName = entry.getName();
if (!entry.isDirectory()) {
if (entryName.endsWith(PY_FILE_EXTENSION)) {
String inferPythonFile = extractFile(targetDirectory, entryName, entry, jarFile);
LOGGER.info("cp infer python file {} to {} from jar file {}", entryName, inferPythonFile, userJobJarFile.getName());
} else if (entryName.endsWith(MODEL_FILE_EXTENSION)) {
modelFileNum++;
Preconditions.checkState(modelFileNum == 1, "upload infer "
+ "model file num more than 1");
String modelFilePath = extractFile(targetDirectory, MODEL_NAME, entry, jarFile);
LOGGER.info("cp infer model file {} to {} from jar file {}", entryName, modelFilePath, userJobJarFile.getName());
} else if (REQUIREMENTS_TXT.equals(entryName)) {
requirementsFileNum++;
Preconditions.checkState(requirementsFileNum == 1, "upload env "
+ "requirements file num more than 1");
String requirementsFilePath = extractFile(targetDirectory, entryName, entry, jarFile);
LOGGER.info("cp end requirements file {} to {} from jar file "
+ "{}", entryName, requirementsFilePath, userJobJarFile.getName());
String inferFile = extractFile(targetDirectory, entryName, entry, jarFile);
LOGGER.info("cp infer file {} to {} from jar file {}", entryName, inferFile, userJobJarFile.getName());
} else {
File entryDestination = new File(targetDirectory, entry.getName());
if (!entryDestination.exists()) {
entryDestination.mkdirs();
}
LOGGER.info("create infer directory is {}", entryDestination);
}
}
jarFile.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ function install_requirements() {
max_retry_times=3
retry_times=0
source $CURRENT_DIR/conda/bin/activate
install_command="$PYTHON_EXEC -m pip install --ignore-installed -r ${REQUIREMENTS_PATH}"
install_command="conda run -p $CURRENT_DIR/conda $PYTHON_EXEC -m pip install --ignore-installed -r ${REQUIREMENTS_PATH}"
${install_command} >/dev/null 2>&1
status=$?
while [[ ${status} -ne 0 ]] && [[ ${retry_times} -lt ${max_retry_times} ]]; do
Expand Down Expand Up @@ -105,10 +105,10 @@ function print_function() {
function download() {
local DOWNLOAD_STATUS=
if hash "wget" 2>/dev/null; then
wget "$1" -O "$2" -q -T20 -t3
wget "$1" -O "$2" -q -T1200 -t3
DOWNLOAD_STATUS="$?"
else
curl "$1" -o "$2" --progress-bar --connect-timeout 20 --retry 3
curl "$1" -o "$2" --progress-bar --connect-timeout 1200 --retry 3
DOWNLOAD_STATUS="$?"
fi
if [ $DOWNLOAD_STATUS -ne 0 ]; then
Expand Down
Loading