diff --git a/src/main/kotlin/eu/openanalytics/shinyproxyoperator/Operator.kt b/src/main/kotlin/eu/openanalytics/shinyproxyoperator/Operator.kt index a8bb94c..550aa6e 100644 --- a/src/main/kotlin/eu/openanalytics/shinyproxyoperator/Operator.kt +++ b/src/main/kotlin/eu/openanalytics/shinyproxyoperator/Operator.kt @@ -26,6 +26,7 @@ import eu.openanalytics.shinyproxyoperator.controller.IRecyclableChecker import eu.openanalytics.shinyproxyoperator.controller.IngressController import eu.openanalytics.shinyproxyoperator.controller.PodRetriever import eu.openanalytics.shinyproxyoperator.controller.RecyclableChecker +import eu.openanalytics.shinyproxyoperator.controller.ReplicaSetStatusChecker import eu.openanalytics.shinyproxyoperator.controller.ResourceListener import eu.openanalytics.shinyproxyoperator.controller.ResourceRetriever import eu.openanalytics.shinyproxyoperator.controller.ShinyProxyController @@ -81,6 +82,7 @@ class Operator(client: NamespacedKubernetesClient? = null, private val podRetriever: PodRetriever private val shinyProxyClient: ShinyProxyClient private val recyclableChecker: IRecyclableChecker + private val replicaSetStatusChecker: ReplicaSetStatusChecker private val shinyProxyListener: ShinyProxyListener private val replicaSetListener: ResourceListener> @@ -151,6 +153,7 @@ class Operator(client: NamespacedKubernetesClient? = null, shinyProxyListener = ShinyProxyListener(sendChannel, this.shinyProxyClient) podRetriever = PodRetriever(this.client) this.recyclableChecker = recyclableChecker ?: RecyclableChecker(podRetriever) + replicaSetStatusChecker = ReplicaSetStatusChecker(podRetriever) if (this.mode == Mode.CLUSTERED) { replicaSetListener = ResourceListener(sendChannel, this.client.inAnyNamespace().apps().replicaSets()) @@ -172,7 +175,7 @@ class Operator(client: NamespacedKubernetesClient? = null, /** * Controllers */ - val shinyProxyController = ShinyProxyController(channel, this.client, shinyProxyClient, serviceController, ingressController, reconcileListener, this.recyclableChecker) + val shinyProxyController = ShinyProxyController(channel, this.client, shinyProxyClient, serviceController, ingressController, reconcileListener, this.recyclableChecker, this.replicaSetStatusChecker) private fun _checkCrdExists(name: String, shortName: String) { try { diff --git a/src/main/kotlin/eu/openanalytics/shinyproxyoperator/components/EventFactory.kt b/src/main/kotlin/eu/openanalytics/shinyproxyoperator/components/EventFactory.kt new file mode 100644 index 0000000..810eb15 --- /dev/null +++ b/src/main/kotlin/eu/openanalytics/shinyproxyoperator/components/EventFactory.kt @@ -0,0 +1,109 @@ +/** + * ShinyProxy-Operator + * + * Copyright (C) 2021-2024 Open Analytics + * + * =========================================================================== + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Apache License as published by + * The Apache Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Apache License for more details. + * + * You should have received a copy of the Apache License + * along with this program. If not, see + */ +package eu.openanalytics.shinyproxyoperator.components + +import eu.openanalytics.shinyproxyoperator.crd.ShinyProxy +import eu.openanalytics.shinyproxyoperator.crd.ShinyProxyInstance +import io.fabric8.kubernetes.api.model.EventBuilder +import io.fabric8.kubernetes.client.KubernetesClient +import mu.KotlinLogging +import java.time.Instant +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter + + +class EventFactory(private val kubeClient: KubernetesClient) { + + private val logger = KotlinLogging.logger {} + + fun createNewInstanceEvent(shinyProxy: ShinyProxy, shinyProxyInstance: ShinyProxyInstance) { + createEvent(shinyProxy, shinyProxyInstance, "Normal", "StartingNewInstance", "Configuration changed") + } + + fun createInstanceReadyEvent(shinyProxy: ShinyProxy, shinyProxyInstance: ShinyProxyInstance) { + createEvent(shinyProxy, shinyProxyInstance, "Normal", "InstanceReady", "ShinyProxy ready") + } + + private fun createEvent(shinyProxy: ShinyProxy, shinyProxyInstance: ShinyProxyInstance, type: String, action: String, reason: String, message: String? = null) { + val k8sMicroTime = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'.'SSSSSSXXX") + + //@formatter:off + val eventBuilder = EventBuilder() + .withNewMetadata() + .withGenerateName("test-event") + .withNamespace(shinyProxy.metadata.namespace) + .addNewOwnerReference() + .withController(true) + .withKind("ShinyProxy") + .withApiVersion("openanalytics.eu/v1") + .withName(shinyProxy.metadata.name) + .withUid(shinyProxy.metadata.uid) + .endOwnerReference() + .withLabels(LabelFactory.labelsForShinyProxyInstance(shinyProxy, shinyProxyInstance)) + .endMetadata() + .withNewInvolvedObject() + .withKind("ShinyProxy") + .withApiVersion("openanalytics.eu/v1") + .withName(shinyProxy.metadata.name) + .withNamespace(shinyProxy.metadata.namespace) + .withUid(shinyProxy.metadata.uid) + .endInvolvedObject() + .withNewEventTime(k8sMicroTime.format(ZonedDateTime.now())) + .withReportingInstance("shinyproxy-operator") // TODO + .withReportingComponent("shinyproxy-operator") + .withAction(action) // which action failed, machine-readable + .withType(type) // Warning or Normal + .withReason(reason); // reason is why the action was taken, human-readable, 128 characters + //@formatter:on + + if (message != null) { + eventBuilder.withMessage(message) + } + + kubeClient.v1().events().resource(eventBuilder.build()).create() + } + + private fun truncateMessage(message: String?): String? { + if (message == null) { + return null + } + if (message.length > 1024) { + return message.substring(0, 1012) + " [truncated]" + } + return message + } + + fun createInstanceFailed(shinyProxy: ShinyProxy, shinyProxyInstance: ShinyProxyInstance, message: String?, creationTimestamp: Instant?) { + val events = kubeClient.v1().events().withLabels(LabelFactory.labelsForShinyProxyInstance(shinyProxy, shinyProxyInstance)).list() + val truncatedMessage = truncateMessage(message) + if (events.items.any { + it.action == "StartingNewInstanceFailed" + && it.type == "Warning" + && it.message == truncatedMessage + && Instant.parse(it.eventTime.time) > creationTimestamp + }) { + return + } + logger.warn { "${shinyProxy.logPrefix(shinyProxyInstance)} Pods are failing: ${message?.replace("\n", "")}" } + createEvent(shinyProxy, shinyProxyInstance, "Warning", "StartingNewInstanceFailed", "ShinyProxy failed to start", truncatedMessage) + } + +} diff --git a/src/main/kotlin/eu/openanalytics/shinyproxyoperator/components/PodTemplateSpecFactory.kt b/src/main/kotlin/eu/openanalytics/shinyproxyoperator/components/PodTemplateSpecFactory.kt index 831d691..f195aab 100644 --- a/src/main/kotlin/eu/openanalytics/shinyproxyoperator/components/PodTemplateSpecFactory.kt +++ b/src/main/kotlin/eu/openanalytics/shinyproxyoperator/components/PodTemplateSpecFactory.kt @@ -121,6 +121,7 @@ class PodTemplateSpecFactory { .withFailureThreshold(6) .withPeriodSeconds(5) .endStartupProbe() + .withTerminationMessagePolicy("FallbackToLogsOnError") .endContainer() .withAffinity(createAffinity(shinyProxy, shinyProxyInstance)) .withVolumes(VolumeBuilder() diff --git a/src/main/kotlin/eu/openanalytics/shinyproxyoperator/controller/ReplicaSetStatusChecker.kt b/src/main/kotlin/eu/openanalytics/shinyproxyoperator/controller/ReplicaSetStatusChecker.kt new file mode 100644 index 0000000..eb16aff --- /dev/null +++ b/src/main/kotlin/eu/openanalytics/shinyproxyoperator/controller/ReplicaSetStatusChecker.kt @@ -0,0 +1,57 @@ +/** + * ShinyProxy-Operator + * + * Copyright (C) 2021-2024 Open Analytics + * + * =========================================================================== + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Apache License as published by + * The Apache Software Foundation, either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Apache License for more details. + * + * You should have received a copy of the Apache License + * along with this program. If not, see + */ +package eu.openanalytics.shinyproxyoperator.controller + +import eu.openanalytics.shinyproxyoperator.crd.ShinyProxy +import eu.openanalytics.shinyproxyoperator.crd.ShinyProxyInstance +import java.time.Instant + +class ReplicaSetStatusChecker(private val podRetriever: PodRetriever) { + + data class Status(val failed: Boolean, val failureMessage: String?, val creationTimestamp: Instant?) + + fun check(shinyProxy: ShinyProxy, shinyProxyInstance: ShinyProxyInstance): Status { + val pods = podRetriever.getShinyProxyPods(shinyProxy, shinyProxyInstance) + if (pods.isEmpty()) { + // no pods -> not ready yet, but not failed + return Status(false, null, null) + } + for (pod in pods) { + if (pod.status.containerStatuses.isEmpty()) { + // no container status yet + continue + } + val shinyproxyContainer = pod.status.containerStatuses.firstOrNull { it.name == "shinyproxy" } + if (shinyproxyContainer == null) { + // no shinyproxy container status + continue + } + if (!shinyproxyContainer.ready && shinyproxyContainer.restartCount >= 1) { + // container has failed + val msg = shinyproxyContainer.lastState?.terminated?.message ?: "Unknown error" + val creationTimestamp = Instant.parse(pod.metadata.creationTimestamp) + return Status(true, msg, creationTimestamp) + } + } + return Status(false, null, null) + } + +} diff --git a/src/main/kotlin/eu/openanalytics/shinyproxyoperator/controller/ShinyProxyController.kt b/src/main/kotlin/eu/openanalytics/shinyproxyoperator/controller/ShinyProxyController.kt index 9696d10..760d436 100644 --- a/src/main/kotlin/eu/openanalytics/shinyproxyoperator/controller/ShinyProxyController.kt +++ b/src/main/kotlin/eu/openanalytics/shinyproxyoperator/controller/ShinyProxyController.kt @@ -22,6 +22,7 @@ package eu.openanalytics.shinyproxyoperator.controller import eu.openanalytics.shinyproxyoperator.ShinyProxyClient import eu.openanalytics.shinyproxyoperator.components.ConfigMapFactory +import eu.openanalytics.shinyproxyoperator.components.EventFactory import eu.openanalytics.shinyproxyoperator.components.LabelFactory import eu.openanalytics.shinyproxyoperator.components.ReplicaSetFactory import eu.openanalytics.shinyproxyoperator.crd.ShinyProxy @@ -36,7 +37,9 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking import mu.KotlinLogging +import kotlin.concurrent.timer class ShinyProxyController(private val channel: Channel, @@ -45,10 +48,12 @@ class ShinyProxyController(private val channel: Channel, private val serviceController: ServiceController, private val ingressController: IngressController, private val reconcileListener: IReconcileListener?, - private val recyclableChecker: IRecyclableChecker) { + private val recyclableChecker: IRecyclableChecker, + private val replicaSetStatusChecker: ReplicaSetStatusChecker) { private val configMapFactory = ConfigMapFactory(kubernetesClient) private val replicaSetFactory = ReplicaSetFactory(kubernetesClient) + private val eventFactory = EventFactory(kubernetesClient) private val logger = KotlinLogging.logger {} @@ -59,7 +64,16 @@ class ShinyProxyController(private val channel: Channel, private set suspend fun run(resourceRetriever: ResourceRetriever, shinyProxyLister: Lister) { - scope.launch { scheduleAdditionalEvents() } + timer(period = 3_000L) { + runBlocking { + channel.send(ShinyProxyEvent(ShinyProxyEventType.CHECK_OBSOLETE_INSTANCES, null, null)) + } + } + timer(period = 30_000L) { + runBlocking { + channel.send(ShinyProxyEvent(ShinyProxyEventType.CHECK_REPLICASET_STATUS, null, null)) + } + } while (true) { try { idle = true @@ -82,6 +96,7 @@ class ShinyProxyController(private val channel: Channel, val newInstance = createNewInstance(event.shinyProxy) reconcileSingleShinyProxyInstance(resourceRetriever, event.shinyProxy, newInstance) } + ShinyProxyEventType.UPDATE_SPEC -> { if (event.shinyProxy == null) { logger.warn { "Event of type UPDATE_SPEC should have shinyproxy attached to it." } @@ -90,9 +105,11 @@ class ShinyProxyController(private val channel: Channel, val newInstance = createNewInstance(event.shinyProxy) reconcileSingleShinyProxyInstance(resourceRetriever, event.shinyProxy, newInstance) } + ShinyProxyEventType.DELETE -> { // DELETE is not needed } + ShinyProxyEventType.RECONCILE -> { if (event.shinyProxy == null) { logger.warn { "Event of type RECONCILE should have shinyProxy attached to it." } @@ -104,9 +121,9 @@ class ShinyProxyController(private val channel: Channel, } reconcileSingleShinyProxyInstance(resourceRetriever, event.shinyProxy, event.shinyProxyInstance) } - ShinyProxyEventType.CHECK_OBSOLETE_INSTANCES -> { - checkForObsoleteInstances(resourceRetriever, shinyProxyLister) - } + + ShinyProxyEventType.CHECK_OBSOLETE_INSTANCES -> checkForObsoleteInstances(resourceRetriever, shinyProxyLister) + ShinyProxyEventType.CHECK_REPLICASET_STATUS -> checkReplicaStatus(shinyProxyLister) } } @@ -155,6 +172,8 @@ class ShinyProxyController(private val channel: Channel, it.status.instances.add(newInstance) } + eventFactory.createNewInstanceEvent(shinyProxy, newInstance) + return newInstance } @@ -209,6 +228,7 @@ class ShinyProxyController(private val channel: Channel, it.status.instances.forEach { inst -> inst.isLatestInstance = false } it.status.getInstanceByHash(latestInstance.hashOfSpec)?.isLatestInstance = true } + eventFactory.createInstanceReadyEvent(shinyProxy, latestInstance) } suspend fun reconcileSingleShinyProxyInstance(resourceRetriever: ResourceRetriever, _shinyProxy: ShinyProxy, _shinyProxyInstance: ShinyProxyInstance) { @@ -301,14 +321,6 @@ class ShinyProxyController(private val channel: Channel, } } - // TODO timer and extract from this class? - private suspend fun scheduleAdditionalEvents() { - while (true) { - channel.send(ShinyProxyEvent(ShinyProxyEventType.CHECK_OBSOLETE_INSTANCES, null, null)) - delay(3000) - } - } - suspend fun deleteSingleShinyProxyInstance(resourceRetriever: ResourceRetriever, shinyProxy: ShinyProxy, shinyProxyInstance: ShinyProxyInstance) { logger.info { "${shinyProxy.logPrefix(shinyProxyInstance)} DeleteSingleShinyProxyInstance [Step 1/3]: Update status" } // Important: update status BEFORE deleting, otherwise we will start reconciling this instance, before it's completely deleted @@ -330,6 +342,17 @@ class ShinyProxyController(private val channel: Channel, } } + private fun checkReplicaStatus(shinyProxyLister: Lister) { + for (shinyProxy in shinyProxyLister.list()) { + for (shinyProxyInstance in shinyProxy.status.instances) { + val status = replicaSetStatusChecker.check(shinyProxy, shinyProxyInstance) + if (status.failed) { + eventFactory.createInstanceFailed(shinyProxy, shinyProxyInstance, status.failureMessage, status.creationTimestamp) + } + } + } + } + companion object { const val amountOfSteps: Int = 6 } diff --git a/src/main/kotlin/eu/openanalytics/shinyproxyoperator/controller/ShinyProxyEventType.kt b/src/main/kotlin/eu/openanalytics/shinyproxyoperator/controller/ShinyProxyEventType.kt index 5bee4fb..bcb9cda 100644 --- a/src/main/kotlin/eu/openanalytics/shinyproxyoperator/controller/ShinyProxyEventType.kt +++ b/src/main/kotlin/eu/openanalytics/shinyproxyoperator/controller/ShinyProxyEventType.kt @@ -25,5 +25,6 @@ enum class ShinyProxyEventType { UPDATE_SPEC, DELETE, RECONCILE, - CHECK_OBSOLETE_INSTANCES -} \ No newline at end of file + CHECK_OBSOLETE_INSTANCES, + CHECK_REPLICASET_STATUS +}