From 4737cce4b9b7e0932c642eedff70ea84e1063acd Mon Sep 17 00:00:00 2001 From: dingyi Date: Wed, 24 Jan 2024 14:18:17 +0800 Subject: [PATCH] Support to report k8s events & nodejs source url fix --- .../geaflow-kubernetes-operator-web/pom.xml | 2 +- .../callback/ClusterStartedCallback.java | 6 +++ .../geaflow/cluster/runner/CommandRunner.java | 13 ++++-- geaflow/geaflow-dashboard/pom.xml | 2 +- .../failover/AbstractFailoverStrategy.java | 10 ++++ .../failover/ClusterFailoverStrategy.java | 13 ++++-- .../failover/DisableFailoverStrategy.java | 8 ++++ .../k8s/client/KubernetesClusterClient.java | 9 ++++ .../KubernetesComponentFailoverStrategy.java | 13 +++++- .../k8s/handler/AbstractPodHandler.java | 12 +++++ .../cluster/k8s/handler/PodAddedHandler.java | 34 ++++++++++++++ .../k8s/handler/PodDeletedHandler.java | 34 ++++++++++++++ .../geaflow/cluster/k8s/handler/PodEvent.java | 14 ++++++ .../cluster/k8s/handler/PodEvictHandler.java | 14 +++--- .../k8s/handler/PodHandlerRegistry.java | 33 +++++++++---- .../cluster/k8s/handler/PodOOMHandler.java | 12 ++--- .../cluster/k8s/utils/KubernetesUtils.java | 5 ++ .../k8s/watcher/KubernetesPodWatcher.java | 16 ++++--- .../stats/collector/EventCollector.java | 46 +++++++++++++++++++ .../collector/StatsCollectorFactory.java | 6 +++ .../geaflow/stats/model/EventInfo.java | 39 ++++++++++++++++ .../geaflow/stats/model/EventLabel.java | 29 ++++++++++++ .../geaflow/stats/model/ExceptionInfo.java | 23 ++++++---- .../geaflow/stats/model/StatsMetricType.java | 3 ++ 24 files changed, 348 insertions(+), 48 deletions(-) create mode 100644 geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodAddedHandler.java create mode 100644 geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodDeletedHandler.java create mode 100644 geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/collector/EventCollector.java create mode 100644 geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/model/EventInfo.java create mode 100644 geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/model/EventLabel.java diff --git a/geaflow-kubernetes-operator/geaflow-kubernetes-operator-web/pom.xml b/geaflow-kubernetes-operator/geaflow-kubernetes-operator-web/pom.xml index 638146384..a58b5717e 100644 --- a/geaflow-kubernetes-operator/geaflow-kubernetes-operator-web/pom.xml +++ b/geaflow-kubernetes-operator/geaflow-kubernetes-operator-web/pom.xml @@ -102,7 +102,7 @@ ${build.node.version} ${build.npm.version} ${build.yarn.version} - https://npm.taobao.org/mirrors/node/ + https://npmmirror.com/mirrors/node/ https://registry.npmmirror.com/npm/-/ https://repo.huaweicloud.com/yarn/ https://registry.npmmirror.com diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/client/callback/ClusterStartedCallback.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/client/callback/ClusterStartedCallback.java index 0a8d69dcd..ce3a7cea9 100644 --- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/client/callback/ClusterStartedCallback.java +++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/client/callback/ClusterStartedCallback.java @@ -75,6 +75,12 @@ public String getClientAddress() { public void setClientAddress(String clientAddress) { this.clientAddress = clientAddress; } + + @Override + public String toString() { + return "ClusterMeta{" + "clientAddress='" + clientAddress + '\'' + ", masterAddress='" + + masterAddress + '\'' + ", driverAddresses=" + driverAddresses + '}'; + } } } diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/runner/CommandRunner.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/runner/CommandRunner.java index 252949d46..a68a1df4b 100644 --- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/runner/CommandRunner.java +++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/runner/CommandRunner.java @@ -18,6 +18,7 @@ import com.antgroup.geaflow.common.exception.GeaflowRuntimeException; import com.antgroup.geaflow.common.utils.ProcessUtil; import com.antgroup.geaflow.stats.collector.StatsCollectorFactory; +import com.antgroup.geaflow.stats.model.EventLabel; import com.antgroup.geaflow.stats.model.ExceptionLevel; import java.io.IOException; import java.lang.ProcessBuilder.Redirect; @@ -50,8 +51,9 @@ public void asyncStart() { startProcess(); } catch (Throwable e) { LOGGER.error("Start process failed: {}", e.getMessage(), e); - StatsCollectorFactory.init(configuration).getExceptionCollector().reportException( - ExceptionLevel.FATAL, e); + String errMsg = String.format("Worker process exited: %s", e.getMessage()); + StatsCollectorFactory.init(configuration).getEventCollector() + .reportEvent(ExceptionLevel.ERROR, EventLabel.WORKER_PROCESS_EXITED, errMsg); } }); } @@ -64,9 +66,14 @@ public void startProcess() { int code = childProcess.waitFor(); LOGGER.warn("Child process {} exits with code: {} and alive: {}", pid, code, childProcess.isAlive()); - if (code == 0 || restarts == 0) { + if (code == 0) { break; } + if (restarts == 0) { + String errMsg = String.format("Latest process %s exits with code: %s: " + + "Exhausted after retrying startup %s times. ", pid, code, maxRestarts + 1); + throw new GeaflowRuntimeException(errMsg); + } restarts--; } while (true); } catch (Exception e) { diff --git a/geaflow/geaflow-dashboard/pom.xml b/geaflow/geaflow-dashboard/pom.xml index 0009a67f5..a15632c91 100644 --- a/geaflow/geaflow-dashboard/pom.xml +++ b/geaflow/geaflow-dashboard/pom.xml @@ -85,7 +85,7 @@ ${build.node.version} ${build.npm.version} ${build.yarn.version} - https://npm.taobao.org/mirrors/node/ + https://npmmirror.com/mirrors/node/ https://registry.npmmirror.com/npm/-/ https://repo.huaweicloud.com/yarn/ https://registry.npmmirror.com diff --git a/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/AbstractFailoverStrategy.java b/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/AbstractFailoverStrategy.java index d7cf4e47a..ab7149847 100644 --- a/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/AbstractFailoverStrategy.java +++ b/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/AbstractFailoverStrategy.java @@ -21,12 +21,16 @@ import com.antgroup.geaflow.cluster.clustermanager.IClusterManager; import com.antgroup.geaflow.cluster.failover.IFailoverStrategy; import com.antgroup.geaflow.env.IEnvironment.EnvType; +import com.antgroup.geaflow.stats.collector.StatsCollectorFactory; +import com.antgroup.geaflow.stats.model.EventLabel; +import com.antgroup.geaflow.stats.model.ExceptionLevel; public abstract class AbstractFailoverStrategy implements IFailoverStrategy { protected EnvType envType; protected AbstractClusterManager clusterManager; protected boolean enableSupervisor; + protected ClusterContext context; public AbstractFailoverStrategy(EnvType envType) { this.envType = envType; @@ -34,9 +38,15 @@ public AbstractFailoverStrategy(EnvType envType) { @Override public void init(ClusterContext context) { + this.context = context; this.enableSupervisor = context.getConfig().getBoolean(SUPERVISOR_ENABLE); } + protected void reportFailoverEvent(ExceptionLevel level, EventLabel label, String message) { + StatsCollectorFactory.init(context.getConfig()).getEventCollector() + .reportEvent(level, label, message); + } + public void setClusterManager(IClusterManager clusterManager) { this.clusterManager = (AbstractClusterManager) clusterManager; } diff --git a/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/ClusterFailoverStrategy.java b/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/ClusterFailoverStrategy.java index ac31255ef..37a0723e4 100644 --- a/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/ClusterFailoverStrategy.java +++ b/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/ClusterFailoverStrategy.java @@ -21,6 +21,8 @@ import com.antgroup.geaflow.cluster.clustermanager.ClusterContext; import com.antgroup.geaflow.cluster.failover.FailoverStrategyType; import com.antgroup.geaflow.env.IEnvironment.EnvType; +import com.antgroup.geaflow.stats.model.EventLabel; +import com.antgroup.geaflow.stats.model.ExceptionLevel; import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,11 +51,16 @@ public void doFailover(int componentId, Throwable cause) { clusterManager.restartAllDrivers(); clusterManager.restartAllContainers(); doKilling.set(false); - LOGGER.info("Completed failover in {} ms.", System.currentTimeMillis() - startTime); + String finishMessage = String.format("Completed cluster failover in %s ms.", + System.currentTimeMillis() - startTime); + LOGGER.info(finishMessage); + reportFailoverEvent(ExceptionLevel.INFO, EventLabel.FAILOVER_FINISH, finishMessage); } else if (doKilling.compareAndSet(false, true)) { String reason = cause == null ? null : cause.getMessage(); - LOGGER.info("Start master failover triggered by component #{}: {}.", - componentId, reason); + String startMessage = String.format("Start master cluster failover triggered by " + + "component #%s: %s.", componentId, reason); + LOGGER.info(startMessage); + reportFailoverEvent(ExceptionLevel.ERROR, EventLabel.FAILOVER_START, startMessage); System.exit(EXIT_CODE); } } diff --git a/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/DisableFailoverStrategy.java b/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/DisableFailoverStrategy.java index 15668509d..d5fe27aab 100644 --- a/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/DisableFailoverStrategy.java +++ b/geaflow/geaflow-deploy/geaflow-cluster-runner/src/main/java/com/antgroup/geaflow/cluster/runner/failover/DisableFailoverStrategy.java @@ -19,9 +19,15 @@ import com.antgroup.geaflow.cluster.clustermanager.ClusterContext; import com.antgroup.geaflow.cluster.failover.FailoverStrategyType; import com.antgroup.geaflow.env.IEnvironment.EnvType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DisableFailoverStrategy extends AbstractFailoverStrategy { + private static final Logger LOGGER = + LoggerFactory.getLogger(DisableFailoverStrategy.class); + + public DisableFailoverStrategy(EnvType envType) { super(envType); } @@ -33,6 +39,8 @@ public void init(ClusterContext context) { @Override public void doFailover(int componentId, Throwable cause) { + LOGGER.info("Failover is disabled, do nothing. Triggered by component #{}: {}.", + componentId, cause == null ? null : cause.getMessage()); } @Override diff --git a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/client/KubernetesClusterClient.java b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/client/KubernetesClusterClient.java index 2a2c977c7..7fe9fa227 100644 --- a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/client/KubernetesClusterClient.java +++ b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/client/KubernetesClusterClient.java @@ -41,6 +41,9 @@ import com.antgroup.geaflow.common.exception.GeaflowRuntimeException; import com.antgroup.geaflow.common.utils.SleepUtils; import com.antgroup.geaflow.env.ctx.IEnvironmentContext; +import com.antgroup.geaflow.stats.collector.StatsCollectorFactory; +import com.antgroup.geaflow.stats.model.EventLabel; +import com.antgroup.geaflow.stats.model.ExceptionLevel; import com.google.common.base.Preconditions; import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.LoadBalancerIngress; @@ -92,10 +95,16 @@ public IPipelineClient startCluster() { config.getString(MASTER_EXPOSED_ADDRESS)); callback.onSuccess(clusterMeta); LOGGER.info("Cluster info: {} config: {}", clusterMeta, config); + String successMsg = String.format("Start cluster success. Cluster info: %s", clusterMeta); + StatsCollectorFactory.init(config).getEventCollector() + .reportEvent(ExceptionLevel.INFO, EventLabel.START_CLUSTER_SUCCESS, successMsg); return PipelineClientFactory.createPipelineClient(driverAddresses, config); } catch (Throwable e) { LOGGER.error("Deploy failed.", e); callback.onFailure(e); + String failMsg = String.format("Start cluster failed: %s", e.getMessage()); + StatsCollectorFactory.init(config).getEventCollector() + .reportEvent(ExceptionLevel.FATAL, EventLabel.START_CLUSTER_FAILED, failMsg); kubernetesClient.destroyCluster(clusterId); throw new GeaflowRuntimeException(e); } diff --git a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/failover/KubernetesComponentFailoverStrategy.java b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/failover/KubernetesComponentFailoverStrategy.java index b1bd9ee1f..a6dc4fd82 100644 --- a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/failover/KubernetesComponentFailoverStrategy.java +++ b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/failover/KubernetesComponentFailoverStrategy.java @@ -20,6 +20,8 @@ import com.antgroup.geaflow.cluster.runner.failover.ComponentFailoverStrategy; import com.antgroup.geaflow.common.exception.GeaflowHeartbeatException; import com.antgroup.geaflow.env.IEnvironment.EnvType; +import com.antgroup.geaflow.stats.model.EventLabel; +import com.antgroup.geaflow.stats.model.ExceptionLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,13 +44,22 @@ public void init(ClusterContext clusterContext) { public void doFailover(int componentId, Throwable cause) { if (componentId != DEFAULT_MASTER_ID) { if (cause instanceof GeaflowHeartbeatException) { + String startMessage = String.format("Start component failover for component #%s " + + "cause by %s.", componentId, cause.getMessage()); + LOGGER.info(startMessage); + reportFailoverEvent(ExceptionLevel.ERROR, EventLabel.FAILOVER_START, startMessage); + long startTime = System.currentTimeMillis(); if (clusterContext.getDriverIds().containsKey(componentId)) { clusterManager.restartDriver(componentId); } else { clusterManager.restartContainer(componentId); } - LOGGER.info("Completed failover in {} ms", System.currentTimeMillis() - startTime); + + String finishMessage = String.format("Completed component failover for component " + + "#%s in %s ms.", componentId, System.currentTimeMillis() - startTime); + LOGGER.info(finishMessage); + reportFailoverEvent(ExceptionLevel.INFO, EventLabel.FAILOVER_FINISH, finishMessage); } else { String reason = cause == null ? null : cause.getMessage(); LOGGER.warn("{} throws exception: {}", componentId, reason); diff --git a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/AbstractPodHandler.java b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/AbstractPodHandler.java index 8bb0c38bb..f3d428c58 100644 --- a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/AbstractPodHandler.java +++ b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/AbstractPodHandler.java @@ -14,6 +14,8 @@ package com.antgroup.geaflow.cluster.k8s.handler; +import com.antgroup.geaflow.stats.collector.StatsCollectorFactory; +import com.antgroup.geaflow.stats.model.ExceptionLevel; import java.util.ArrayList; import java.util.List; @@ -35,4 +37,14 @@ public void notifyListeners(PodEvent event) { } } + protected void reportPodEvent(PodEvent event, ExceptionLevel level, String message) { + String eventMessage = buildEventMessage(event, message); + StatsCollectorFactory.getInstance().getEventCollector() + .reportEvent(level, event.getEventKind().name(), eventMessage); + } + + private String buildEventMessage(PodEvent event, String message) { + return message + "\n" + event.toString(); + } + } diff --git a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodAddedHandler.java b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodAddedHandler.java new file mode 100644 index 000000000..59d52e5b1 --- /dev/null +++ b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodAddedHandler.java @@ -0,0 +1,34 @@ +/* + * Copyright 2023 AntGroup CO., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +package com.antgroup.geaflow.cluster.k8s.handler; + +import com.antgroup.geaflow.cluster.k8s.handler.PodHandlerRegistry.EventKind; +import com.antgroup.geaflow.cluster.k8s.utils.KubernetesUtils; +import com.antgroup.geaflow.stats.model.ExceptionLevel; +import io.fabric8.kubernetes.api.model.Pod; + +public class PodAddedHandler extends AbstractPodHandler { + + @Override + public void handle(Pod pod) { + String componentId = KubernetesUtils.extractComponentId(pod); + if (componentId != null) { + String addMessage = String.format("Pod #%s %s is created.", + componentId, pod.getMetadata().getName()); + PodEvent event = new PodEvent(pod, EventKind.POD_ADDED); + reportPodEvent(event, ExceptionLevel.INFO, addMessage); + } + } +} diff --git a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodDeletedHandler.java b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodDeletedHandler.java new file mode 100644 index 000000000..7e1a06656 --- /dev/null +++ b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodDeletedHandler.java @@ -0,0 +1,34 @@ +/* + * Copyright 2023 AntGroup CO., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +package com.antgroup.geaflow.cluster.k8s.handler; + +import com.antgroup.geaflow.cluster.k8s.handler.PodHandlerRegistry.EventKind; +import com.antgroup.geaflow.cluster.k8s.utils.KubernetesUtils; +import com.antgroup.geaflow.stats.model.ExceptionLevel; +import io.fabric8.kubernetes.api.model.Pod; + +public class PodDeletedHandler extends AbstractPodHandler { + + @Override + public void handle(Pod pod) { + String componentId = KubernetesUtils.extractComponentId(pod); + if (componentId != null) { + String deleteMessage = String.format("Pod #%s %s is deleted.", + componentId, pod.getMetadata().getName()); + PodEvent event = new PodEvent(pod, EventKind.POD_DELETED); + reportPodEvent(event, ExceptionLevel.ERROR, deleteMessage); + } + } +} diff --git a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodEvent.java b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodEvent.java index 78492a307..ba5881bdd 100644 --- a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodEvent.java +++ b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodEvent.java @@ -15,6 +15,8 @@ package com.antgroup.geaflow.cluster.k8s.handler; import com.antgroup.geaflow.cluster.k8s.handler.PodHandlerRegistry.EventKind; +import com.antgroup.geaflow.cluster.k8s.utils.KubernetesUtils; +import io.fabric8.kubernetes.api.model.Pod; import java.io.Serializable; public class PodEvent implements Serializable { @@ -25,6 +27,18 @@ public class PodEvent implements Serializable { private long ts; private String containerId; + public PodEvent(Pod pod, EventKind kind) { + this(pod, kind, System.currentTimeMillis()); + } + + public PodEvent(Pod pod, EventKind kind, long ts) { + this.eventKind = kind; + this.containerId = KubernetesUtils.extractComponentId(pod); + this.podIp = pod.getStatus().getPodIP(); + this.hostIp = pod.getStatus().getHostIP(); + this.ts = ts; + } + public EventKind getEventKind() { return eventKind; } diff --git a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodEvictHandler.java b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodEvictHandler.java index a75c61a4b..84a9b9a72 100644 --- a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodEvictHandler.java +++ b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodEvictHandler.java @@ -19,6 +19,7 @@ import com.antgroup.geaflow.cluster.k8s.handler.PodHandlerRegistry.EventKind; import com.antgroup.geaflow.cluster.k8s.utils.KubernetesUtils; import com.antgroup.geaflow.common.config.Configuration; +import com.antgroup.geaflow.stats.model.ExceptionLevel; import io.fabric8.kubernetes.api.model.Pod; import java.util.Map; import org.slf4j.Logger; @@ -41,18 +42,15 @@ public void handle(Pod pod) { String key = entry.getKey(); if (labels.get(key) != null && labels.get(key).equalsIgnoreCase(entry.getValue())) { String componentId = KubernetesUtils.extractComponentId(pod); - LOG.info( - "Pod #{} {} will be removed, label: {} annotations: {}, total removed: {}", + String message = String.format( + "Pod #%s %s will be removed, label: %s annotations: %s, total removed: %s", componentId, pod.getMetadata().getName(), key, pod.getMetadata().getAnnotations(), ++totalCount); + LOG.info(message); - PodEvent event = new PodEvent(); - event.setEventKind(EventKind.EVICTION); - event.setContainerId(componentId); - event.setHostIp(pod.getStatus().getHostIP()); - event.setPodIp(pod.getStatus().getPodIP()); - event.setTs(System.currentTimeMillis()); + PodEvent event = new PodEvent(pod, EventKind.POD_EVICTION); notifyListeners(event); + reportPodEvent(event, ExceptionLevel.WARN, message); break; } } diff --git a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodHandlerRegistry.java b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodHandlerRegistry.java index c921b58cc..f05d751bb 100644 --- a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodHandlerRegistry.java +++ b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodHandlerRegistry.java @@ -15,19 +15,30 @@ package com.antgroup.geaflow.cluster.k8s.handler; import com.antgroup.geaflow.common.config.Configuration; -import java.util.Collection; +import io.fabric8.kubernetes.client.Watcher.Action; import java.util.HashMap; import java.util.Map; public class PodHandlerRegistry { - private final Map eventHandlerMap; + private final Map> eventHandlerMap; private static PodHandlerRegistry INSTANCE; private PodHandlerRegistry(Configuration configuration) { this.eventHandlerMap = new HashMap<>(); - this.eventHandlerMap.put(EventKind.OOM, new PodOOMHandler()) ; - this.eventHandlerMap.put(EventKind.EVICTION, new PodEvictHandler(configuration)); + + Map modifiedHandlerMap = new HashMap<>(); + modifiedHandlerMap.put(EventKind.POD_OOM, new PodOOMHandler()) ; + modifiedHandlerMap.put(EventKind.POD_EVICTION, new PodEvictHandler(configuration)); + this.eventHandlerMap.put(Action.MODIFIED, modifiedHandlerMap); + + Map addedHandlerMap = new HashMap<>(); + addedHandlerMap.put(EventKind.POD_ADDED, new PodAddedHandler()); + this.eventHandlerMap.put(Action.ADDED, addedHandlerMap); + + Map deletedHandlerMap = new HashMap<>(); + deletedHandlerMap.put(EventKind.POD_DELETED, new PodDeletedHandler()); + this.eventHandlerMap.put(Action.DELETED, deletedHandlerMap); } public static synchronized PodHandlerRegistry getInstance(Configuration configuration) { @@ -37,13 +48,19 @@ public static synchronized PodHandlerRegistry getInstance(Configuration configur return INSTANCE; } - public Collection getHandlers() { - return eventHandlerMap.values(); + public void registerListener(Action action, EventKind eventKind, IEventListener listener) { + ((AbstractPodHandler) eventHandlerMap.get(action).get(eventKind)).addListener(listener); + } + + public Map> getHandlerMap() { + return eventHandlerMap; } public enum EventKind { - OOM, - EVICTION + POD_ADDED, + POD_DELETED, + POD_OOM, + POD_EVICTION } } diff --git a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodOOMHandler.java b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodOOMHandler.java index 6371cd7c4..22ccd1844 100644 --- a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodOOMHandler.java +++ b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/handler/PodOOMHandler.java @@ -21,7 +21,7 @@ import com.antgroup.geaflow.cluster.k8s.handler.PodHandlerRegistry.EventKind; import com.antgroup.geaflow.cluster.k8s.utils.KubernetesUtils; import com.antgroup.geaflow.common.tuple.Tuple; -import com.antgroup.geaflow.stats.collector.StatsCollectorFactory; +import com.antgroup.geaflow.stats.model.ExceptionLevel; import io.fabric8.kubernetes.api.model.ContainerState; import io.fabric8.kubernetes.api.model.ContainerStatus; import io.fabric8.kubernetes.api.model.Pod; @@ -91,18 +91,12 @@ public void handle(Pod pod) { LOGGER.info("Pod #{} {} oom killed at {}, totally: {}", componentId, pod.getMetadata().getName(), parsed, totalOOMCount); - PodEvent oomEvent = new PodEvent(); - oomEvent.setEventKind(EventKind.OOM); - oomEvent.setTs(exceptionTime); - oomEvent.setPodIp(pod.getStatus().getPodIP()); - oomEvent.setHostIp(pod.getStatus().getHostIP()); - oomEvent.setContainerId(componentId); + PodEvent oomEvent = new PodEvent(pod, EventKind.POD_OOM, exceptionTime); notifyListeners(oomEvent); String errMsg = String.format("pod %s oom killed at %s", pod.getMetadata().getName(), parsed); - StatsCollectorFactory.getInstance().getExceptionCollector() - .reportException(new OutOfMemoryError(errMsg)); + reportPodEvent(oomEvent, ExceptionLevel.ERROR, errMsg); } } } diff --git a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/utils/KubernetesUtils.java b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/utils/KubernetesUtils.java index e35cb0fb3..4667dae1c 100644 --- a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/utils/KubernetesUtils.java +++ b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/utils/KubernetesUtils.java @@ -331,6 +331,11 @@ public static String extractComponentId(Pod pod) { return pod.getMetadata().getLabels().get(K8SConstants.LABEL_COMPONENT_ID_KEY); } + @Nullable + public static String extractComponent(Pod pod) { + return pod.getMetadata().getLabels().get(K8SConstants.LABEL_COMPONENT_KEY); + } + public static String encodeRpcAddressMap(Map addressMap) { return Joiner.on(CONFIG_LIST_SEPARATOR).withKeyValueSeparator(ADDRESS_SEPARATOR) .join(addressMap); diff --git a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/watcher/KubernetesPodWatcher.java b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/watcher/KubernetesPodWatcher.java index 69bff3b62..ba9324550 100644 --- a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/watcher/KubernetesPodWatcher.java +++ b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/watcher/KubernetesPodWatcher.java @@ -21,6 +21,7 @@ import com.antgroup.geaflow.cluster.k8s.config.K8SConstants; import com.antgroup.geaflow.cluster.k8s.handler.IPodEventHandler; import com.antgroup.geaflow.cluster.k8s.handler.PodHandlerRegistry; +import com.antgroup.geaflow.cluster.k8s.handler.PodHandlerRegistry.EventKind; import com.antgroup.geaflow.cluster.k8s.utils.KubernetesUtils; import com.antgroup.geaflow.common.config.Configuration; import com.antgroup.geaflow.common.utils.ThreadUtil; @@ -28,7 +29,6 @@ import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.Watcher.Action; -import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executors; @@ -45,7 +45,7 @@ public class KubernetesPodWatcher { private Watch watcher; private final int checkInterval; private volatile boolean watcherClosed; - private final Collection eventHandlers; + private final Map> eventHandlerMap; private final GeaflowKubeClient kubernetesClient; private final Map labels; private final ScheduledExecutorService executorService; @@ -62,7 +62,7 @@ public KubernetesPodWatcher(Configuration config) { ThreadUtil.namedThreadFactory(true, "cluster-watcher")); PodHandlerRegistry registry = PodHandlerRegistry.getInstance(config); - this.eventHandlers = registry.getHandlers(); + this.eventHandlerMap = registry.getHandlerMap(); } public void start() { @@ -93,7 +93,7 @@ private void createAndStartPodsWatcher() { watcherClosed = false; } } - }, checkInterval, checkInterval, TimeUnit.SECONDS); + }, 0, checkInterval, TimeUnit.SECONDS); } private void handlePodMessage(Watcher.Action action, Pod pod) { @@ -103,8 +103,12 @@ private void handlePodMessage(Watcher.Action action, Pod pod) { pod.getMetadata().getLabels(), action); return; } - if (action == Action.MODIFIED) { - eventHandlers.forEach(h -> h.handle(pod)); + String component = KubernetesUtils.extractComponent(pod); + if (K8SConstants.LABEL_COMPONENT_CLIENT.equals(component)) { + return; + } + if (eventHandlerMap.containsKey(action)) { + eventHandlerMap.get(action).forEach((kind, handler) -> handler.handle(pod)); } else { LOGGER.info("Skip {} event for pod {}", action, pod.getMetadata().getName()); } diff --git a/geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/collector/EventCollector.java b/geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/collector/EventCollector.java new file mode 100644 index 000000000..0a106e207 --- /dev/null +++ b/geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/collector/EventCollector.java @@ -0,0 +1,46 @@ +/* + * Copyright 2023 AntGroup CO., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +package com.antgroup.geaflow.stats.collector; + +import com.antgroup.geaflow.common.config.Configuration; +import com.antgroup.geaflow.common.utils.ProcessUtil; +import com.antgroup.geaflow.stats.model.EventInfo; +import com.antgroup.geaflow.stats.model.EventLabel; +import com.antgroup.geaflow.stats.model.ExceptionLevel; +import com.antgroup.geaflow.stats.model.StatsMetricType; +import com.antgroup.geaflow.stats.sink.IStatsWriter; + +public class EventCollector extends BaseStatsCollector { + + EventCollector(IStatsWriter statsWriter, Configuration configuration) { + super(statsWriter, configuration); + } + + public void reportEvent(ExceptionLevel severityLevel, EventLabel label, String message) { + reportEvent(severityLevel, label.name(), message); + } + + public void reportEvent(ExceptionLevel severityLevel, String label, String message) { + EventInfo eventInfo = new EventInfo(ProcessUtil.getHostname(), + ProcessUtil.getHostIp(), ProcessUtil.getProcessId(), + message, severityLevel.name(), label); + addToWriterQueue(genEventKey(), eventInfo); + } + + private String genEventKey() { + return jobName + StatsMetricType.Event.getValue() + (Long.MAX_VALUE - System.currentTimeMillis()); + } + +} diff --git a/geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/collector/StatsCollectorFactory.java b/geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/collector/StatsCollectorFactory.java index caae85e49..408ef207c 100644 --- a/geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/collector/StatsCollectorFactory.java +++ b/geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/collector/StatsCollectorFactory.java @@ -22,6 +22,7 @@ public class StatsCollectorFactory { private final ExceptionCollector exceptionCollector; + private final EventCollector eventCollector; private final PipelineStatsCollector pipelineStatsCollector; private final ProcessStatsCollector processStatsCollector; private final MetricMetaCollector metricMetaCollector; @@ -33,6 +34,7 @@ public class StatsCollectorFactory { private StatsCollectorFactory(Configuration configuration) { this.syncWriter = StatsWriterFactory.getStatsWriter(configuration, true); this.exceptionCollector = new ExceptionCollector(syncWriter, configuration); + this.eventCollector = new EventCollector(syncWriter, configuration); this.metricCache = new MetricCache(configuration); IStatsWriter statsWriter = StatsWriterFactory.getStatsWriter(configuration); this.pipelineStatsCollector = new PipelineStatsCollector(statsWriter, configuration, metricCache); @@ -56,6 +58,10 @@ public ExceptionCollector getExceptionCollector() { return exceptionCollector; } + public EventCollector getEventCollector() { + return eventCollector; + } + public PipelineStatsCollector getPipelineStatsCollector() { return pipelineStatsCollector; } diff --git a/geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/model/EventInfo.java b/geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/model/EventInfo.java new file mode 100644 index 000000000..f977b1cef --- /dev/null +++ b/geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/model/EventInfo.java @@ -0,0 +1,39 @@ +/* + * Copyright 2023 AntGroup CO., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +package com.antgroup.geaflow.stats.model; + +public class EventInfo extends ExceptionInfo { + + private String label; + + public EventInfo(String hostname, String ip, int processId, String message, String severity, + String label) { + super(hostname, ip, processId, message, severity); + this.label = label; + } + + public String getLabel() { + return label; + } + + public void setLabel(String label) { + this.label = label; + } + + @Override + public String toString() { + return "EventInfo{" + "label='" + label + '\'' + ", super=" + super.toString() + '}'; + } +} diff --git a/geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/model/EventLabel.java b/geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/model/EventLabel.java new file mode 100644 index 000000000..16d361370 --- /dev/null +++ b/geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/model/EventLabel.java @@ -0,0 +1,29 @@ +/* + * Copyright 2023 AntGroup CO., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +package com.antgroup.geaflow.stats.model; + +public enum EventLabel { + + START_CLUSTER_SUCCESS, + + START_CLUSTER_FAILED, + + WORKER_PROCESS_EXITED, + + FAILOVER_START, + + FAILOVER_FINISH + +} diff --git a/geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/model/ExceptionInfo.java b/geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/model/ExceptionInfo.java index f708c5e27..b4925a5b2 100644 --- a/geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/model/ExceptionInfo.java +++ b/geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/model/ExceptionInfo.java @@ -18,21 +18,21 @@ public class ExceptionInfo implements Serializable { - private String hostname; - private String ip; - private int processId; - private String severity; - private String message; - private long timestamp; + protected String hostname; + protected String ip; + protected int processId; + protected String severity; + protected String message; + protected long timestamp; public ExceptionInfo(String hostname, String ip, int processId, String message, - String label) { + String severity) { this.hostname = hostname; this.ip = ip; this.processId = processId; this.message = message; this.timestamp = System.currentTimeMillis(); - this.severity = label; + this.severity = severity; } public String getIp() { @@ -82,6 +82,13 @@ public String getSeverity() { public void setSeverity(String severity) { this.severity = severity; } + + @Override + public String toString() { + return "ExceptionInfo{" + "hostname='" + hostname + '\'' + ", ip='" + ip + '\'' + + ", processId=" + processId + ", severity='" + severity + '\'' + ", message='" + + message + '\'' + ", timestamp=" + timestamp + '}'; + } } diff --git a/geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/model/StatsMetricType.java b/geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/model/StatsMetricType.java index 6f9016115..d1972f342 100644 --- a/geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/model/StatsMetricType.java +++ b/geaflow/geaflow-metrics/geaflow-stats-metrics/src/main/java/com/antgroup/geaflow/stats/model/StatsMetricType.java @@ -19,6 +19,9 @@ public enum StatsMetricType { /** exception log. */ Exception("_exception_"), + /** event log. */ + Event("_event_"), + /** runtime metrics. */ Metrics("_metrics_"),