Skip to content

Commit

Permalink
[Fix][RayService] Use LRU cache for ServeConfigs (#2683)
Browse files Browse the repository at this point in the history
  • Loading branch information
MortalHappiness authored Dec 27, 2024
1 parent 3425b4b commit efbd35e
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 10 deletions.
22 changes: 14 additions & 8 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/utils/lru"

networkingv1 "k8s.io/api/networking/v1"

Expand Down Expand Up @@ -57,9 +58,9 @@ type RayServiceReconciler struct {
Recorder record.EventRecorder
// Currently, the Ray dashboard doesn't cache the Serve application config.
// To avoid reapplying the same config repeatedly, cache the config in this map.
// Stores map of cacheKey to map of RayCluster name to Serve application config,
// where cacheKey is the combination of RayService namespace and name.
ServeConfigs cmap.ConcurrentMap[string, cmap.ConcurrentMap[string, string]]
// Cache key is the combination of RayService namespace and name.
// Cache value is map of RayCluster name to Serve application config.
ServeConfigs *lru.Cache
RayClusterDeletionTimestamps cmap.ConcurrentMap[string, time.Time]
dashboardClientFunc func() utils.RayDashboardClientInterface
httpProxyClientFunc func() utils.RayHttpProxyClientInterface
Expand All @@ -73,7 +74,7 @@ func NewRayServiceReconciler(_ context.Context, mgr manager.Manager, provider ut
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("rayservice-controller"),
ServeConfigs: cmap.New[cmap.ConcurrentMap[string, string]](),
ServeConfigs: lru.New(utils.ServeConfigLRUSize),
RayClusterDeletionTimestamps: cmap.New[time.Time](),

dashboardClientFunc: dashboardClientFunc,
Expand Down Expand Up @@ -540,10 +541,11 @@ func (r *RayServiceReconciler) cleanUpServeConfigCache(ctx context.Context, rayS
pendingRayClusterName := rayServiceInstance.Status.PendingServiceStatus.RayClusterName

cacheKey := rayServiceInstance.Namespace + "/" + rayServiceInstance.Name
serveConfigs, exist := r.ServeConfigs.Get(cacheKey)
cacheValue, exist := r.ServeConfigs.Get(cacheKey)
if !exist {
return
}
serveConfigs := cacheValue.(cmap.ConcurrentMap[string, string])

for key := range serveConfigs.Items() {
if key == activeRayClusterName || key == pendingRayClusterName {
Expand Down Expand Up @@ -965,10 +967,11 @@ func (r *RayServiceReconciler) getAndCheckServeStatus(ctx context.Context, dashb

func (r *RayServiceReconciler) getServeConfigFromCache(rayServiceInstance *rayv1.RayService, clusterName string) string {
cacheKey := rayServiceInstance.Namespace + "/" + rayServiceInstance.Name
serveConfigs, exist := r.ServeConfigs.Get(cacheKey)
cacheValue, exist := r.ServeConfigs.Get(cacheKey)
if !exist {
return ""
}
serveConfigs := cacheValue.(cmap.ConcurrentMap[string, string])
serveConfig, exist := serveConfigs.Get(clusterName)
if !exist {
return ""
Expand All @@ -982,10 +985,13 @@ func (r *RayServiceReconciler) cacheServeConfig(rayServiceInstance *rayv1.RaySer
return
}
cacheKey := rayServiceInstance.Namespace + "/" + rayServiceInstance.Name
rayServiceServeConfigs, exist := r.ServeConfigs.Get(cacheKey)
cacheValue, exist := r.ServeConfigs.Get(cacheKey)
var rayServiceServeConfigs cmap.ConcurrentMap[string, string]
if !exist {
rayServiceServeConfigs = cmap.New[string]()
r.ServeConfigs.Set(cacheKey, rayServiceServeConfigs)
r.ServeConfigs.Add(cacheKey, rayServiceServeConfigs)
} else {
rayServiceServeConfigs = cacheValue.(cmap.ConcurrentMap[string, string])
}
rayServiceServeConfigs.Set(clusterName, serveConfig)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import (
"testing"
"time"

cmap "github.com/orcaman/concurrent-map/v2"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/utils/lru"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand Down Expand Up @@ -669,7 +669,7 @@ func TestCheckIfNeedSubmitServeDeployment(t *testing.T) {
Client: fakeClient,
Recorder: &record.FakeRecorder{},
Scheme: scheme.Scheme,
ServeConfigs: cmap.New[cmap.ConcurrentMap[string, string]](),
ServeConfigs: lru.New(utils.ServeConfigLRUSize),
}

namespace := "ray"
Expand Down
2 changes: 2 additions & 0 deletions ray-operator/controllers/ray/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ const (

// KubeRayController represents the value of the default job controller
KubeRayController = "ray.io/kuberay-operator"

ServeConfigLRUSize = 1000
)

type ServiceType string
Expand Down

0 comments on commit efbd35e

Please sign in to comment.