Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-335] Support to report k8s events & nodejs source url fix #336

Merged
merged 1 commit into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@
<nodeVersion>${build.node.version}</nodeVersion>
<npmVersion>${build.npm.version}</npmVersion>
<yarnVersion>${build.yarn.version}</yarnVersion>
<nodeDownloadRoot>https://npm.taobao.org/mirrors/node/</nodeDownloadRoot>
<nodeDownloadRoot>https://npmmirror.com/mirrors/node/</nodeDownloadRoot>
<npmDownloadRoot>https://registry.npmmirror.com/npm/-/</npmDownloadRoot>
<yarnDownloadRoot>https://repo.huaweicloud.com/yarn/</yarnDownloadRoot>
<npmRegistryURL>https://registry.npmmirror.com</npmRegistryURL>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ public String getClientAddress() {
public void setClientAddress(String clientAddress) {
this.clientAddress = clientAddress;
}

@Override
public String toString() {
return "ClusterMeta{" + "clientAddress='" + clientAddress + '\'' + ", masterAddress='"
+ masterAddress + '\'' + ", driverAddresses=" + driverAddresses + '}';
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.common.utils.ProcessUtil;
import com.antgroup.geaflow.stats.collector.StatsCollectorFactory;
import com.antgroup.geaflow.stats.model.EventLabel;
import com.antgroup.geaflow.stats.model.ExceptionLevel;
import java.io.IOException;
import java.lang.ProcessBuilder.Redirect;
Expand Down Expand Up @@ -50,8 +51,9 @@ public void asyncStart() {
startProcess();
} catch (Throwable e) {
LOGGER.error("Start process failed: {}", e.getMessage(), e);
StatsCollectorFactory.init(configuration).getExceptionCollector().reportException(
ExceptionLevel.FATAL, e);
String errMsg = String.format("Worker process exited: %s", e.getMessage());
StatsCollectorFactory.init(configuration).getEventCollector()
.reportEvent(ExceptionLevel.ERROR, EventLabel.WORKER_PROCESS_EXITED, errMsg);
}
});
}
Expand All @@ -64,9 +66,14 @@ public void startProcess() {
int code = childProcess.waitFor();
LOGGER.warn("Child process {} exits with code: {} and alive: {}", pid, code,
childProcess.isAlive());
if (code == 0 || restarts == 0) {
if (code == 0) {
break;
}
if (restarts == 0) {
String errMsg = String.format("Latest process %s exits with code: %s: "
+ "Exhausted after retrying startup %s times. ", pid, code, maxRestarts + 1);
throw new GeaflowRuntimeException(errMsg);
}
restarts--;
} while (true);
} catch (Exception e) {
Expand Down
2 changes: 1 addition & 1 deletion geaflow/geaflow-dashboard/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
<nodeVersion>${build.node.version}</nodeVersion>
<npmVersion>${build.npm.version}</npmVersion>
<yarnVersion>${build.yarn.version}</yarnVersion>
<nodeDownloadRoot>https://npm.taobao.org/mirrors/node/</nodeDownloadRoot>
<nodeDownloadRoot>https://npmmirror.com/mirrors/node/</nodeDownloadRoot>
<npmDownloadRoot>https://registry.npmmirror.com/npm/-/</npmDownloadRoot>
<yarnDownloadRoot>https://repo.huaweicloud.com/yarn/</yarnDownloadRoot>
<npmRegistryURL>https://registry.npmmirror.com</npmRegistryURL>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,32 @@
import com.antgroup.geaflow.cluster.clustermanager.IClusterManager;
import com.antgroup.geaflow.cluster.failover.IFailoverStrategy;
import com.antgroup.geaflow.env.IEnvironment.EnvType;
import com.antgroup.geaflow.stats.collector.StatsCollectorFactory;
import com.antgroup.geaflow.stats.model.EventLabel;
import com.antgroup.geaflow.stats.model.ExceptionLevel;

public abstract class AbstractFailoverStrategy implements IFailoverStrategy {

protected EnvType envType;
protected AbstractClusterManager clusterManager;
protected boolean enableSupervisor;
protected ClusterContext context;

public AbstractFailoverStrategy(EnvType envType) {
this.envType = envType;
}

@Override
public void init(ClusterContext context) {
this.context = context;
this.enableSupervisor = context.getConfig().getBoolean(SUPERVISOR_ENABLE);
}

protected void reportFailoverEvent(ExceptionLevel level, EventLabel label, String message) {
StatsCollectorFactory.init(context.getConfig()).getEventCollector()
.reportEvent(level, label, message);
}

public void setClusterManager(IClusterManager clusterManager) {
this.clusterManager = (AbstractClusterManager) clusterManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.antgroup.geaflow.cluster.clustermanager.ClusterContext;
import com.antgroup.geaflow.cluster.failover.FailoverStrategyType;
import com.antgroup.geaflow.env.IEnvironment.EnvType;
import com.antgroup.geaflow.stats.model.EventLabel;
import com.antgroup.geaflow.stats.model.ExceptionLevel;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -49,11 +51,16 @@ public void doFailover(int componentId, Throwable cause) {
clusterManager.restartAllDrivers();
clusterManager.restartAllContainers();
doKilling.set(false);
LOGGER.info("Completed failover in {} ms.", System.currentTimeMillis() - startTime);
String finishMessage = String.format("Completed cluster failover in %s ms.",
System.currentTimeMillis() - startTime);
LOGGER.info(finishMessage);
reportFailoverEvent(ExceptionLevel.INFO, EventLabel.FAILOVER_FINISH, finishMessage);
} else if (doKilling.compareAndSet(false, true)) {
String reason = cause == null ? null : cause.getMessage();
LOGGER.info("Start master failover triggered by component #{}: {}.",
componentId, reason);
String startMessage = String.format("Start master cluster failover triggered by "
+ "component #%s: %s.", componentId, reason);
LOGGER.info(startMessage);
reportFailoverEvent(ExceptionLevel.ERROR, EventLabel.FAILOVER_START, startMessage);
System.exit(EXIT_CODE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@
import com.antgroup.geaflow.cluster.clustermanager.ClusterContext;
import com.antgroup.geaflow.cluster.failover.FailoverStrategyType;
import com.antgroup.geaflow.env.IEnvironment.EnvType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DisableFailoverStrategy extends AbstractFailoverStrategy {

private static final Logger LOGGER =
LoggerFactory.getLogger(DisableFailoverStrategy.class);


public DisableFailoverStrategy(EnvType envType) {
super(envType);
}
Expand All @@ -33,6 +39,8 @@ public void init(ClusterContext context) {

@Override
public void doFailover(int componentId, Throwable cause) {
LOGGER.info("Failover is disabled, do nothing. Triggered by component #{}: {}.",
componentId, cause == null ? null : cause.getMessage());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.common.utils.SleepUtils;
import com.antgroup.geaflow.env.ctx.IEnvironmentContext;
import com.antgroup.geaflow.stats.collector.StatsCollectorFactory;
import com.antgroup.geaflow.stats.model.EventLabel;
import com.antgroup.geaflow.stats.model.ExceptionLevel;
import com.google.common.base.Preconditions;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.LoadBalancerIngress;
Expand Down Expand Up @@ -92,10 +95,16 @@ public IPipelineClient startCluster() {
config.getString(MASTER_EXPOSED_ADDRESS));
callback.onSuccess(clusterMeta);
LOGGER.info("Cluster info: {} config: {}", clusterMeta, config);
String successMsg = String.format("Start cluster success. Cluster info: %s", clusterMeta);
StatsCollectorFactory.init(config).getEventCollector()
.reportEvent(ExceptionLevel.INFO, EventLabel.START_CLUSTER_SUCCESS, successMsg);
return PipelineClientFactory.createPipelineClient(driverAddresses, config);
} catch (Throwable e) {
LOGGER.error("Deploy failed.", e);
callback.onFailure(e);
String failMsg = String.format("Start cluster failed: %s", e.getMessage());
StatsCollectorFactory.init(config).getEventCollector()
.reportEvent(ExceptionLevel.FATAL, EventLabel.START_CLUSTER_FAILED, failMsg);
kubernetesClient.destroyCluster(clusterId);
throw new GeaflowRuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.antgroup.geaflow.cluster.runner.failover.ComponentFailoverStrategy;
import com.antgroup.geaflow.common.exception.GeaflowHeartbeatException;
import com.antgroup.geaflow.env.IEnvironment.EnvType;
import com.antgroup.geaflow.stats.model.EventLabel;
import com.antgroup.geaflow.stats.model.ExceptionLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,13 +44,22 @@ public void init(ClusterContext clusterContext) {
public void doFailover(int componentId, Throwable cause) {
if (componentId != DEFAULT_MASTER_ID) {
if (cause instanceof GeaflowHeartbeatException) {
String startMessage = String.format("Start component failover for component #%s "
+ "cause by %s.", componentId, cause.getMessage());
LOGGER.info(startMessage);
reportFailoverEvent(ExceptionLevel.ERROR, EventLabel.FAILOVER_START, startMessage);

long startTime = System.currentTimeMillis();
if (clusterContext.getDriverIds().containsKey(componentId)) {
clusterManager.restartDriver(componentId);
} else {
clusterManager.restartContainer(componentId);
}
LOGGER.info("Completed failover in {} ms", System.currentTimeMillis() - startTime);

String finishMessage = String.format("Completed component failover for component "
+ "#%s in %s ms.", componentId, System.currentTimeMillis() - startTime);
LOGGER.info(finishMessage);
reportFailoverEvent(ExceptionLevel.INFO, EventLabel.FAILOVER_FINISH, finishMessage);
} else {
String reason = cause == null ? null : cause.getMessage();
LOGGER.warn("{} throws exception: {}", componentId, reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package com.antgroup.geaflow.cluster.k8s.handler;

import com.antgroup.geaflow.stats.collector.StatsCollectorFactory;
import com.antgroup.geaflow.stats.model.ExceptionLevel;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -35,4 +37,14 @@ public void notifyListeners(PodEvent event) {
}
}

protected void reportPodEvent(PodEvent event, ExceptionLevel level, String message) {
String eventMessage = buildEventMessage(event, message);
StatsCollectorFactory.getInstance().getEventCollector()
.reportEvent(level, event.getEventKind().name(), eventMessage);
}

private String buildEventMessage(PodEvent event, String message) {
return message + "\n" + event.toString();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2023 AntGroup CO., Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.antgroup.geaflow.cluster.k8s.handler;

import com.antgroup.geaflow.cluster.k8s.handler.PodHandlerRegistry.EventKind;
import com.antgroup.geaflow.cluster.k8s.utils.KubernetesUtils;
import com.antgroup.geaflow.stats.model.ExceptionLevel;
import io.fabric8.kubernetes.api.model.Pod;

public class PodAddedHandler extends AbstractPodHandler {

@Override
public void handle(Pod pod) {
String componentId = KubernetesUtils.extractComponentId(pod);
if (componentId != null) {
String addMessage = String.format("Pod #%s %s is created.",
componentId, pod.getMetadata().getName());
PodEvent event = new PodEvent(pod, EventKind.POD_ADDED);
reportPodEvent(event, ExceptionLevel.INFO, addMessage);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2023 AntGroup CO., Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.antgroup.geaflow.cluster.k8s.handler;

import com.antgroup.geaflow.cluster.k8s.handler.PodHandlerRegistry.EventKind;
import com.antgroup.geaflow.cluster.k8s.utils.KubernetesUtils;
import com.antgroup.geaflow.stats.model.ExceptionLevel;
import io.fabric8.kubernetes.api.model.Pod;

public class PodDeletedHandler extends AbstractPodHandler {

@Override
public void handle(Pod pod) {
String componentId = KubernetesUtils.extractComponentId(pod);
if (componentId != null) {
String deleteMessage = String.format("Pod #%s %s is deleted.",
componentId, pod.getMetadata().getName());
PodEvent event = new PodEvent(pod, EventKind.POD_DELETED);
reportPodEvent(event, ExceptionLevel.ERROR, deleteMessage);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package com.antgroup.geaflow.cluster.k8s.handler;

import com.antgroup.geaflow.cluster.k8s.handler.PodHandlerRegistry.EventKind;
import com.antgroup.geaflow.cluster.k8s.utils.KubernetesUtils;
import io.fabric8.kubernetes.api.model.Pod;
import java.io.Serializable;

public class PodEvent implements Serializable {
Expand All @@ -25,6 +27,18 @@ public class PodEvent implements Serializable {
private long ts;
private String containerId;

public PodEvent(Pod pod, EventKind kind) {
this(pod, kind, System.currentTimeMillis());
}

public PodEvent(Pod pod, EventKind kind, long ts) {
this.eventKind = kind;
this.containerId = KubernetesUtils.extractComponentId(pod);
this.podIp = pod.getStatus().getPodIP();
this.hostIp = pod.getStatus().getHostIP();
this.ts = ts;
}

public EventKind getEventKind() {
return eventKind;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.antgroup.geaflow.cluster.k8s.handler.PodHandlerRegistry.EventKind;
import com.antgroup.geaflow.cluster.k8s.utils.KubernetesUtils;
import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.stats.model.ExceptionLevel;
import io.fabric8.kubernetes.api.model.Pod;
import java.util.Map;
import org.slf4j.Logger;
Expand All @@ -41,18 +42,15 @@ public void handle(Pod pod) {
String key = entry.getKey();
if (labels.get(key) != null && labels.get(key).equalsIgnoreCase(entry.getValue())) {
String componentId = KubernetesUtils.extractComponentId(pod);
LOG.info(
"Pod #{} {} will be removed, label: {} annotations: {}, total removed: {}",
String message = String.format(
"Pod #%s %s will be removed, label: %s annotations: %s, total removed: %s",
componentId, pod.getMetadata().getName(), key,
pod.getMetadata().getAnnotations(), ++totalCount);
LOG.info(message);

PodEvent event = new PodEvent();
event.setEventKind(EventKind.EVICTION);
event.setContainerId(componentId);
event.setHostIp(pod.getStatus().getHostIP());
event.setPodIp(pod.getStatus().getPodIP());
event.setTs(System.currentTimeMillis());
PodEvent event = new PodEvent(pod, EventKind.POD_EVICTION);
notifyListeners(event);
reportPodEvent(event, ExceptionLevel.WARN, message);
break;
}
}
Expand Down
Loading
Loading