Skip to content

Commit

Permalink
feat: support max number of concurrent resource operation executions (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
liu-hm19 authored Sep 23, 2024
1 parent 0b7337c commit bccd483
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 4 deletions.
10 changes: 10 additions & 0 deletions pkg/apis/api.kusion.io/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,3 +734,13 @@ type Release struct {
// ModifiedTime is the time that the Release is modified.
ModifiedTime time.Time `yaml:"modifiedTime" json:"modifiedTime"`
}

const (
// Environment variable for maximum number of concurrent resource executions,
// including preview, apply and destroy.
// Note that the maximum number should be between 1 to 100.
MaxConcurrentEnvVar = "KUSION_EXEC_MAX_CONCURRENT"

// The default maximum number of concurrent resource executions for Kusion is 10.
DefaultMaxConcurrent = 10
)
5 changes: 5 additions & 0 deletions pkg/engine/operation/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ func (ao *ApplyOperation) Apply(req *ApplyRequest) (rsp *ApplyResponse, s v1.Sta
return nil, s
}

// Update the operation semaphore.
if err := o.UpdateSemaphore(); err != nil {
return nil, v1.NewErrorStatus(err)
}

// 1. init & build Indexes
priorState := req.Release.State
priorStateResourceIndex := priorState.Resources.Index()
Expand Down
4 changes: 0 additions & 4 deletions pkg/engine/operation/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"kusionstack.io/kusion/pkg/engine/runtime"
runtimeinit "kusionstack.io/kusion/pkg/engine/runtime/init"
"kusionstack.io/kusion/pkg/engine/runtime/kubernetes"
"kusionstack.io/kusion/pkg/infra/util/semaphore"
)

func TestApplyOperation_Apply(t *testing.T) {
Expand All @@ -33,7 +32,6 @@ func TestApplyOperation_Apply(t *testing.T) {
msgCh chan models.Message
release *apiv1.Release
lock *sync.Mutex
sem *semaphore.Semaphore
}
type args struct {
applyRequest *ApplyRequest
Expand Down Expand Up @@ -113,7 +111,6 @@ func TestApplyOperation_Apply(t *testing.T) {
releaseStorage: &storages.LocalStorage{},
runtimeMap: map[apiv1.Type]runtime.Runtime{runtime.Kubernetes: &kubernetes.KubernetesRuntime{}},
msgCh: make(chan models.Message, 5),
sem: semaphore.New(10),
},
args: args{applyRequest: &ApplyRequest{
Request: models.Request{
Expand Down Expand Up @@ -141,7 +138,6 @@ func TestApplyOperation_Apply(t *testing.T) {
MsgCh: tc.fields.msgCh,
Release: tc.fields.release,
Lock: tc.fields.lock,
Sem: tc.fields.sem,
}
ao := &ApplyOperation{
Operation: *o,
Expand Down
5 changes: 5 additions & 0 deletions pkg/engine/operation/destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ func (do *DestroyOperation) Destroy(req *DestroyRequest) (rsp *DestroyResponse,
return nil, s
}

// Update the operation semaphore.
if err := o.UpdateSemaphore(); err != nil {
return nil, v1.NewErrorStatus(err)
}

// 1. init & build Indexes
priorState := req.Release.State
priorStateResourceIndex := priorState.Resources.Index()
Expand Down
21 changes: 21 additions & 0 deletions pkg/engine/operation/models/operation_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package models

import (
"fmt"
"os"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -124,3 +126,22 @@ func (o *Operation) UpdateReleaseState() error {
log.Infof("update release succeeded, project %s, workspace %s, revision %d", o.Release.Project, o.Release.Workspace, o.Release.Revision)
return nil
}

// Update the operation semaphore with the maximum number of concurrent resource executions.
func (o *Operation) UpdateSemaphore() error {
v := os.Getenv(apiv1.MaxConcurrentEnvVar)
if v != "" {
maxConcurrent, err := strconv.Atoi(v)
if err != nil {
return err
}
if maxConcurrent < 1 || maxConcurrent > 100 {
return fmt.Errorf("invalid value for maximum number of concurrent resource executions: %d", maxConcurrent)
}
o.Sem = semaphore.New(int64(maxConcurrent))
} else {
o.Sem = semaphore.New(int64(apiv1.DefaultMaxConcurrent))
}

return nil
}
65 changes: 65 additions & 0 deletions pkg/engine/operation/models/operation_context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package models

import (
"errors"
"os"
"testing"

"github.com/stretchr/testify/assert"
apiv1 "kusionstack.io/kusion/pkg/apis/api.kusion.io/v1"
"kusionstack.io/kusion/pkg/infra/util/semaphore"
)

func TestOperation_UpdateSemaphore(t *testing.T) {
original := os.Getenv(apiv1.MaxConcurrentEnvVar)
defer os.Setenv(apiv1.MaxConcurrentEnvVar, original)

testcases := []struct {
name string
env string
expectedErr error
expectedVal int64
}{
{
name: "Invalid Env Type",
env: "not-a-number",
expectedErr: errors.New("invalid syntax"),
},
{
name: "Invalid Value (less than 0)",
env: "-1",
expectedErr: errors.New("invalid value"),
},
{
name: "Invalid Value (larger than 100)",
env: "200",
expectedErr: errors.New("invalid value"),
},
{
name: "Default Value",
env: "",
expectedErr: nil,
expectedVal: int64(apiv1.DefaultMaxConcurrent),
},
{
name: "Customized Value",
env: "50",
expectedErr: nil,
expectedVal: int64(50),
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
op := &Operation{}
os.Setenv(apiv1.MaxConcurrentEnvVar, tc.env)
err := op.UpdateSemaphore()
if tc.expectedErr != nil {
assert.ErrorContains(t, err, tc.expectedErr.Error())
} else {
assert.Nil(t, err)
assert.Equal(t, *semaphore.New(tc.expectedVal), *op.Sem)
}
})
}
}
5 changes: 5 additions & 0 deletions pkg/engine/operation/preview.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ func (po *PreviewOperation) Preview(req *PreviewRequest) (rsp *PreviewResponse,
return nil, s
}

// Update the operation semaphore.
if err := o.UpdateSemaphore(); err != nil {
return nil, v1.NewErrorStatus(err)
}

// 1. init & build Indexes
priorState := req.State

Expand Down

0 comments on commit bccd483

Please sign in to comment.