Skip to content

Commit

Permalink
Ref #33488: implement basic events
Browse files Browse the repository at this point in the history
  • Loading branch information
LEDfan committed Aug 20, 2024
1 parent 82bd922 commit 75bb9d2
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import eu.openanalytics.shinyproxyoperator.controller.IRecyclableChecker
import eu.openanalytics.shinyproxyoperator.controller.IngressController
import eu.openanalytics.shinyproxyoperator.controller.PodRetriever
import eu.openanalytics.shinyproxyoperator.controller.RecyclableChecker
import eu.openanalytics.shinyproxyoperator.controller.ReplicaSetStatusChecker
import eu.openanalytics.shinyproxyoperator.controller.ResourceListener
import eu.openanalytics.shinyproxyoperator.controller.ResourceRetriever
import eu.openanalytics.shinyproxyoperator.controller.ShinyProxyController
Expand Down Expand Up @@ -81,6 +82,7 @@ class Operator(client: NamespacedKubernetesClient? = null,
private val podRetriever: PodRetriever
private val shinyProxyClient: ShinyProxyClient
private val recyclableChecker: IRecyclableChecker
private val replicaSetStatusChecker: ReplicaSetStatusChecker

private val shinyProxyListener: ShinyProxyListener
private val replicaSetListener: ResourceListener<ReplicaSet, ReplicaSetList, RollableScalableResource<ReplicaSet>>
Expand Down Expand Up @@ -151,6 +153,7 @@ class Operator(client: NamespacedKubernetesClient? = null,
shinyProxyListener = ShinyProxyListener(sendChannel, this.shinyProxyClient)
podRetriever = PodRetriever(this.client)
this.recyclableChecker = recyclableChecker ?: RecyclableChecker(podRetriever)
replicaSetStatusChecker = ReplicaSetStatusChecker(podRetriever)

if (this.mode == Mode.CLUSTERED) {
replicaSetListener = ResourceListener(sendChannel, this.client.inAnyNamespace().apps().replicaSets())
Expand All @@ -172,7 +175,7 @@ class Operator(client: NamespacedKubernetesClient? = null,
/**
* Controllers
*/
val shinyProxyController = ShinyProxyController(channel, this.client, shinyProxyClient, serviceController, ingressController, reconcileListener, this.recyclableChecker)
val shinyProxyController = ShinyProxyController(channel, this.client, shinyProxyClient, serviceController, ingressController, reconcileListener, this.recyclableChecker, this.replicaSetStatusChecker)

private fun _checkCrdExists(name: String, shortName: String) {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
* ShinyProxy-Operator
*
* Copyright (C) 2021-2024 Open Analytics
*
* ===========================================================================
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Apache License as published by
* The Apache Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Apache License for more details.
*
* You should have received a copy of the Apache License
* along with this program. If not, see <http://www.apache.org/licenses/>
*/
package eu.openanalytics.shinyproxyoperator.components

import eu.openanalytics.shinyproxyoperator.crd.ShinyProxy
import eu.openanalytics.shinyproxyoperator.crd.ShinyProxyInstance
import io.fabric8.kubernetes.api.model.EventBuilder
import io.fabric8.kubernetes.client.KubernetesClient
import mu.KotlinLogging
import java.time.Instant
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter


class EventFactory(private val kubeClient: KubernetesClient) {

private val logger = KotlinLogging.logger {}

fun createNewInstanceEvent(shinyProxy: ShinyProxy, shinyProxyInstance: ShinyProxyInstance) {
createEvent(shinyProxy, shinyProxyInstance, "Normal", "StartingNewInstance", "Configuration changed")
}

fun createInstanceReadyEvent(shinyProxy: ShinyProxy, shinyProxyInstance: ShinyProxyInstance) {
createEvent(shinyProxy, shinyProxyInstance, "Normal", "InstanceReady", "ShinyProxy ready")
}

private fun createEvent(shinyProxy: ShinyProxy, shinyProxyInstance: ShinyProxyInstance, type: String, action: String, reason: String, message: String? = null) {
val k8sMicroTime = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'.'SSSSSSXXX")

//@formatter:off
val eventBuilder = EventBuilder()
.withNewMetadata()
.withGenerateName("test-event")
.withNamespace(shinyProxy.metadata.namespace)
.addNewOwnerReference()
.withController(true)
.withKind("ShinyProxy")
.withApiVersion("openanalytics.eu/v1")
.withName(shinyProxy.metadata.name)
.withUid(shinyProxy.metadata.uid)
.endOwnerReference()
.withLabels<String, String>(LabelFactory.labelsForShinyProxyInstance(shinyProxy, shinyProxyInstance))
.endMetadata()
.withNewInvolvedObject()
.withKind("ShinyProxy")
.withApiVersion("openanalytics.eu/v1")
.withName(shinyProxy.metadata.name)
.withNamespace(shinyProxy.metadata.namespace)
.withUid(shinyProxy.metadata.uid)
.endInvolvedObject()
.withNewEventTime(k8sMicroTime.format(ZonedDateTime.now()))
.withReportingInstance("shinyproxy-operator") // TODO
.withReportingComponent("shinyproxy-operator")
.withAction(action) // which action failed, machine-readable
.withType(type) // Warning or Normal
.withReason(reason); // reason is why the action was taken, human-readable, 128 characters
//@formatter:on

if (message != null) {
eventBuilder.withMessage(message)
}

kubeClient.v1().events().resource(eventBuilder.build()).create()
}

private fun truncateMessage(message: String?): String? {
if (message == null) {
return null
}
if (message.length > 1024) {
return message.substring(0, 1012) + " [truncated]"
}
return message
}

fun createInstanceFailed(shinyProxy: ShinyProxy, shinyProxyInstance: ShinyProxyInstance, message: String?, creationTimestamp: Instant?) {
val events = kubeClient.v1().events().withLabels(LabelFactory.labelsForShinyProxyInstance(shinyProxy, shinyProxyInstance)).list()
val truncatedMessage = truncateMessage(message)
if (events.items.any {
it.action == "StartingNewInstanceFailed"
&& it.type == "Warning"
&& it.message == truncatedMessage
&& Instant.parse(it.eventTime.time) > creationTimestamp
}) {
return
}
logger.warn { "${shinyProxy.logPrefix(shinyProxyInstance)} Pods are failing: ${message?.replace("\n", "")}" }
createEvent(shinyProxy, shinyProxyInstance, "Warning", "StartingNewInstanceFailed", "ShinyProxy failed to start", truncatedMessage)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class PodTemplateSpecFactory {
.withFailureThreshold(6)
.withPeriodSeconds(5)
.endStartupProbe()
.withTerminationMessagePolicy("FallbackToLogsOnError")
.endContainer()
.withAffinity(createAffinity(shinyProxy, shinyProxyInstance))
.withVolumes(VolumeBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* ShinyProxy-Operator
*
* Copyright (C) 2021-2024 Open Analytics
*
* ===========================================================================
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Apache License as published by
* The Apache Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Apache License for more details.
*
* You should have received a copy of the Apache License
* along with this program. If not, see <http://www.apache.org/licenses/>
*/
package eu.openanalytics.shinyproxyoperator.controller

import eu.openanalytics.shinyproxyoperator.crd.ShinyProxy
import eu.openanalytics.shinyproxyoperator.crd.ShinyProxyInstance
import java.time.Instant

class ReplicaSetStatusChecker(private val podRetriever: PodRetriever) {

data class Status(val failed: Boolean, val failureMessage: String?, val creationTimestamp: Instant?)

fun check(shinyProxy: ShinyProxy, shinyProxyInstance: ShinyProxyInstance): Status {
val pods = podRetriever.getShinyProxyPods(shinyProxy, shinyProxyInstance)
if (pods.isEmpty()) {
// no pods -> not ready yet, but not failed
return Status(false, null, null)
}
for (pod in pods) {
if (pod.status.containerStatuses.isEmpty()) {
// no container status yet
continue
}
val shinyproxyContainer = pod.status.containerStatuses.firstOrNull { it.name == "shinyproxy" }
if (shinyproxyContainer == null) {
// no shinyproxy container status
continue
}
if (!shinyproxyContainer.ready && shinyproxyContainer.restartCount >= 1) {
// container has failed
val msg = shinyproxyContainer.lastState?.terminated?.message ?: "Unknown error"
val creationTimestamp = Instant.parse(pod.metadata.creationTimestamp)
return Status(true, msg, creationTimestamp)
}
}
return Status(false, null, null)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package eu.openanalytics.shinyproxyoperator.controller

import eu.openanalytics.shinyproxyoperator.ShinyProxyClient
import eu.openanalytics.shinyproxyoperator.components.ConfigMapFactory
import eu.openanalytics.shinyproxyoperator.components.EventFactory
import eu.openanalytics.shinyproxyoperator.components.LabelFactory
import eu.openanalytics.shinyproxyoperator.components.ReplicaSetFactory
import eu.openanalytics.shinyproxyoperator.crd.ShinyProxy
Expand All @@ -36,7 +37,9 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import kotlin.concurrent.timer


class ShinyProxyController(private val channel: Channel<ShinyProxyEvent>,
Expand All @@ -45,10 +48,12 @@ class ShinyProxyController(private val channel: Channel<ShinyProxyEvent>,
private val serviceController: ServiceController,
private val ingressController: IngressController,
private val reconcileListener: IReconcileListener?,
private val recyclableChecker: IRecyclableChecker) {
private val recyclableChecker: IRecyclableChecker,
private val replicaSetStatusChecker: ReplicaSetStatusChecker) {

private val configMapFactory = ConfigMapFactory(kubernetesClient)
private val replicaSetFactory = ReplicaSetFactory(kubernetesClient)
private val eventFactory = EventFactory(kubernetesClient)

private val logger = KotlinLogging.logger {}

Expand All @@ -59,7 +64,16 @@ class ShinyProxyController(private val channel: Channel<ShinyProxyEvent>,
private set

suspend fun run(resourceRetriever: ResourceRetriever, shinyProxyLister: Lister<ShinyProxy>) {
scope.launch { scheduleAdditionalEvents() }
timer(period = 3_000L) {
runBlocking {
channel.send(ShinyProxyEvent(ShinyProxyEventType.CHECK_OBSOLETE_INSTANCES, null, null))
}
}
timer(period = 30_000L) {
runBlocking {
channel.send(ShinyProxyEvent(ShinyProxyEventType.CHECK_REPLICASET_STATUS, null, null))
}
}
while (true) {
try {
idle = true
Expand All @@ -82,6 +96,7 @@ class ShinyProxyController(private val channel: Channel<ShinyProxyEvent>,
val newInstance = createNewInstance(event.shinyProxy)
reconcileSingleShinyProxyInstance(resourceRetriever, event.shinyProxy, newInstance)
}

ShinyProxyEventType.UPDATE_SPEC -> {
if (event.shinyProxy == null) {
logger.warn { "Event of type UPDATE_SPEC should have shinyproxy attached to it." }
Expand All @@ -90,9 +105,11 @@ class ShinyProxyController(private val channel: Channel<ShinyProxyEvent>,
val newInstance = createNewInstance(event.shinyProxy)
reconcileSingleShinyProxyInstance(resourceRetriever, event.shinyProxy, newInstance)
}

ShinyProxyEventType.DELETE -> {
// DELETE is not needed
}

ShinyProxyEventType.RECONCILE -> {
if (event.shinyProxy == null) {
logger.warn { "Event of type RECONCILE should have shinyProxy attached to it." }
Expand All @@ -104,9 +121,9 @@ class ShinyProxyController(private val channel: Channel<ShinyProxyEvent>,
}
reconcileSingleShinyProxyInstance(resourceRetriever, event.shinyProxy, event.shinyProxyInstance)
}
ShinyProxyEventType.CHECK_OBSOLETE_INSTANCES -> {
checkForObsoleteInstances(resourceRetriever, shinyProxyLister)
}

ShinyProxyEventType.CHECK_OBSOLETE_INSTANCES -> checkForObsoleteInstances(resourceRetriever, shinyProxyLister)
ShinyProxyEventType.CHECK_REPLICASET_STATUS -> checkReplicaStatus(shinyProxyLister)
}
}

Expand Down Expand Up @@ -155,6 +172,8 @@ class ShinyProxyController(private val channel: Channel<ShinyProxyEvent>,
it.status.instances.add(newInstance)
}

eventFactory.createNewInstanceEvent(shinyProxy, newInstance)

return newInstance
}

Expand Down Expand Up @@ -209,6 +228,7 @@ class ShinyProxyController(private val channel: Channel<ShinyProxyEvent>,
it.status.instances.forEach { inst -> inst.isLatestInstance = false }
it.status.getInstanceByHash(latestInstance.hashOfSpec)?.isLatestInstance = true
}
eventFactory.createInstanceReadyEvent(shinyProxy, latestInstance)
}

suspend fun reconcileSingleShinyProxyInstance(resourceRetriever: ResourceRetriever, _shinyProxy: ShinyProxy, _shinyProxyInstance: ShinyProxyInstance) {
Expand Down Expand Up @@ -301,14 +321,6 @@ class ShinyProxyController(private val channel: Channel<ShinyProxyEvent>,
}
}

// TODO timer and extract from this class?
private suspend fun scheduleAdditionalEvents() {
while (true) {
channel.send(ShinyProxyEvent(ShinyProxyEventType.CHECK_OBSOLETE_INSTANCES, null, null))
delay(3000)
}
}

suspend fun deleteSingleShinyProxyInstance(resourceRetriever: ResourceRetriever, shinyProxy: ShinyProxy, shinyProxyInstance: ShinyProxyInstance) {
logger.info { "${shinyProxy.logPrefix(shinyProxyInstance)} DeleteSingleShinyProxyInstance [Step 1/3]: Update status" }
// Important: update status BEFORE deleting, otherwise we will start reconciling this instance, before it's completely deleted
Expand All @@ -330,6 +342,17 @@ class ShinyProxyController(private val channel: Channel<ShinyProxyEvent>,
}
}

private fun checkReplicaStatus(shinyProxyLister: Lister<ShinyProxy>) {
for (shinyProxy in shinyProxyLister.list()) {
for (shinyProxyInstance in shinyProxy.status.instances) {
val status = replicaSetStatusChecker.check(shinyProxy, shinyProxyInstance)
if (status.failed) {
eventFactory.createInstanceFailed(shinyProxy, shinyProxyInstance, status.failureMessage, status.creationTimestamp)
}
}
}
}

companion object {
const val amountOfSteps: Int = 6
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ enum class ShinyProxyEventType {
UPDATE_SPEC,
DELETE,
RECONCILE,
CHECK_OBSOLETE_INSTANCES
}
CHECK_OBSOLETE_INSTANCES,
CHECK_REPLICASET_STATUS
}

0 comments on commit 75bb9d2

Please sign in to comment.