Skip to content

Commit

Permalink
feat: add CIDR affinity to searcher (#2111)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Feb 23, 2023
1 parent 2b0c8f3 commit e0ce8d8
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 51 deletions.
6 changes: 6 additions & 0 deletions api/manager/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5052,6 +5052,12 @@ const docTemplate = `{
"d7y_io_dragonfly_v2_manager_types.SchedulerClusterScopes": {
"type": "object",
"properties": {
"cidrs": {
"type": "array",
"items": {
"type": "string"
}
},
"idc": {
"type": "string"
},
Expand Down
6 changes: 6 additions & 0 deletions api/manager/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -5045,6 +5045,12 @@
"d7y_io_dragonfly_v2_manager_types.SchedulerClusterScopes": {
"type": "object",
"properties": {
"cidrs": {
"type": "array",
"items": {
"type": "string"
}
},
"idc": {
"type": "string"
},
Expand Down
4 changes: 4 additions & 0 deletions api/manager/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,10 @@ definitions:
type: object
d7y_io_dragonfly_v2_manager_types.SchedulerClusterScopes:
properties:
cidrs:
items:
type: string
type: array
idc:
type: string
location:
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ require (
github.com/swaggo/files v0.0.0-20220728132757-551d4a08d97a
github.com/swaggo/gin-swagger v1.5.3
github.com/swaggo/swag v1.8.9
github.com/yl2chen/cidranger v1.0.2
go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.37.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.37.0
go.opentelemetry.io/otel v1.13.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,8 @@ github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/yl2chen/cidranger v1.0.2 h1:lbOWZVCG1tCRX4u24kuM1Tb4nHqWkDxwLdoS+SevawU=
github.com/yl2chen/cidranger v1.0.2/go.mod h1:9U1yz7WPYDwf0vpNWFaeRh0bjwz5RVgRy/9UEQfHl0g=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
8 changes: 4 additions & 4 deletions manager/searcher/mocks/searcher_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions manager/searcher/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (
"path"
"testing"

testifyassert "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/assert"
)

func TestLoadPlugin(t *testing.T) {
assert := testifyassert.New(t)
assert := assert.New(t)
defer func() {
os.Remove("./testdata/d7y-manager-plugin-searcher.so")
os.Remove("./testdata/test")
Expand Down
98 changes: 67 additions & 31 deletions manager/searcher/searcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"context"
"errors"
"fmt"
"net"
"sort"
"strings"

"github.com/mitchellh/mapstructure"
"github.com/yl2chen/cidranger"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/manager/model"
Expand All @@ -45,17 +47,20 @@ const (
)

const (
// clusterTypeWeight cluster type weight.
clusterTypeWeight float64 = 0.03
// securityDomainAffinityWeight is security domain affinity weight.
securityDomainAffinityWeight float64 = 0.4

// SecurityDomain affinity weight.
securityDomainAffinityWeight float64 = 0.5
// cidrAffinityWeight is CIDR affinity weight.
cidrAffinityWeight float64 = 0.3

// IDC affinity weight.
idcAffinityWeight float64 = 0.35
// idcAffinityWeight is IDC affinity weight.
idcAffinityWeight float64 = 0.15

// Location affinity weight.
locationAffinityWeight = 0.12
// locationAffinityWeight is location affinity weight.
locationAffinityWeight = 0.1

// clusterTypeWeight is cluster type weight.
clusterTypeWeight float64 = 0.05
)

const (
Expand All @@ -73,16 +78,19 @@ const (

// Scheduler cluster scopes.
type Scopes struct {
IDC string `mapstructure:"idc"`
Location string `mapstructure:"location"`
IDC string `mapstructure:"idc"`
Location string `mapstructure:"location"`
CIDRs []string `mapstructure:"cidrs"`
}

type Searcher interface {
// FindSchedulerClusters finds scheduler clusters that best matches the evaluation.
FindSchedulerClusters(ctx context.Context, schedulerClusters []model.SchedulerCluster, hostname, ip string, conditions map[string]string) ([]model.SchedulerCluster, error)
FindSchedulerClusters(ctx context.Context, schedulerClusters []model.SchedulerCluster, ip, hostname string, conditions map[string]string) ([]model.SchedulerCluster, error)
}

type searcher struct{}
type searcher struct {
cidrs []string
}

func New(pluginDir string) Searcher {
s, err := LoadPlugin(pluginDir)
Expand All @@ -96,11 +104,7 @@ func New(pluginDir string) Searcher {
}

// FindSchedulerClusters finds scheduler clusters that best matches the evaluation.
func (s *searcher) FindSchedulerClusters(ctx context.Context, schedulerClusters []model.SchedulerCluster, hostname, ip string, conditions map[string]string) ([]model.SchedulerCluster, error) {
if len(conditions) <= 0 {
return nil, errors.New("empty conditions")
}

func (s *searcher) FindSchedulerClusters(ctx context.Context, schedulerClusters []model.SchedulerCluster, ip, hostname string, conditions map[string]string) ([]model.SchedulerCluster, error) {
if len(schedulerClusters) <= 0 {
return nil, errors.New("empty scheduler clusters")
}
Expand All @@ -124,7 +128,7 @@ func (s *searcher) FindSchedulerClusters(ctx context.Context, schedulerClusters
return false
}

return Evaluate(conditions, si, clusters[i]) > Evaluate(conditions, sj, clusters[j])
return Evaluate(ip, hostname, conditions, si, clusters[i]) > Evaluate(ip, hostname, conditions, sj, clusters[j])
},
)

Expand Down Expand Up @@ -173,20 +177,12 @@ func FilterSchedulerClusters(conditions map[string]string, schedulerClusters []m
}

// Evaluate the degree of matching between scheduler cluster and dfdaemon.
func Evaluate(conditions map[string]string, scopes Scopes, cluster model.SchedulerCluster) float64 {
return clusterTypeWeight*calculateClusterTypeScore(cluster) +
securityDomainAffinityWeight*calculateSecurityDomainAffinityScore(conditions[ConditionSecurityDomain], cluster.SecurityGroup.SecurityRules) +
func Evaluate(ip, hostname string, conditions map[string]string, scopes Scopes, cluster model.SchedulerCluster) float64 {
return securityDomainAffinityWeight*calculateSecurityDomainAffinityScore(conditions[ConditionSecurityDomain], cluster.SecurityGroup.SecurityRules) +
cidrAffinityWeight*calculateCIDRAffinityScore(ip, scopes.CIDRs) +
idcAffinityWeight*calculateIDCAffinityScore(conditions[ConditionIDC], scopes.IDC) +
locationAffinityWeight*calculateMultiElementAffinityScore(conditions[ConditionLocation], scopes.Location)
}

// calculateClusterTypeScore 0.0~1.0 larger and better.
func calculateClusterTypeScore(cluster model.SchedulerCluster) float64 {
if cluster.IsDefault {
return maxScore
}

return minScore
locationAffinityWeight*calculateMultiElementAffinityScore(conditions[ConditionLocation], scopes.Location) +
clusterTypeWeight*calculateClusterTypeScore(cluster)
}

// calculateSecurityDomainAffinityScore 0.0~1.0 larger and better.
Expand All @@ -202,6 +198,37 @@ func calculateSecurityDomainAffinityScore(securityDomain string, securityRules [
return maxScore
}

// calculateCIDRAffinityScore 0.0~1.0 larger and better.
func calculateCIDRAffinityScore(ip string, cidrs []string) float64 {
// Construct CIDR ranger.
ranger := cidranger.NewPCTrieRanger()
for _, cidr := range cidrs {
_, network, err := net.ParseCIDR(cidr)
if err != nil {
logger.Error(err)
continue
}

if err := ranger.Insert(cidranger.NewBasicRangerEntry(*network)); err != nil {
logger.Error(err)
continue
}
}

// Determine whether an IP is contained in the constructed networks ranger.
contains, err := ranger.Contains(net.ParseIP(ip))
if err != nil {
logger.Error(err)
return minScore
}

if !contains {
return minScore
}

return maxScore
}

// calculateIDCAffinityScore 0.0~1.0 larger and better.
func calculateIDCAffinityScore(dst, src string) float64 {
if dst == "" || src == "" {
Expand Down Expand Up @@ -255,3 +282,12 @@ func calculateMultiElementAffinityScore(dst, src string) float64 {

return float64(score) / float64(maxElementLen)
}

// calculateClusterTypeScore 0.0~1.0 larger and better.
func calculateClusterTypeScore(cluster model.SchedulerCluster) float64 {
if cluster.IsDefault {
return maxScore
}

return minScore
}
53 changes: 42 additions & 11 deletions manager/searcher/searcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,14 @@ import (
"d7y.io/dragonfly/v2/manager/model"
)

func TestSchedulerCluster(t *testing.T) {
func TestSearcher_FindSchedulerClusters(t *testing.T) {
pluginDir := "."
tests := []struct {
name string
schedulerClusters []model.SchedulerCluster
conditions map[string]string
expect func(t *testing.T, data []model.SchedulerCluster, err error)
}{
{
name: "conditions is empty",
schedulerClusters: []model.SchedulerCluster{{Name: "foo"}},
conditions: map[string]string{},
expect: func(t *testing.T, data []model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.EqualError(err, "empty conditions")
},
},
{
name: "scheduler clusters is empty",
schedulerClusters: []model.SchedulerCluster{},
Expand Down Expand Up @@ -284,6 +275,40 @@ func TestSchedulerCluster(t *testing.T) {
assert.Equal(len(data), 2)
},
},
{
name: "match according to cidr condition",
schedulerClusters: []model.SchedulerCluster{
{
Name: "foo",
Scopes: map[string]any{
"cidrs": []string{"128.168.1.0/24"},
},
Schedulers: []model.Scheduler{
{
HostName: "foo",
State: "active",
},
},
},
{
Name: "bar",
Scopes: map[string]any{},
Schedulers: []model.Scheduler{
{
HostName: "bar",
State: "active",
},
},
},
},
conditions: map[string]string{},
expect: func(t *testing.T, data []model.SchedulerCluster, err error) {
assert := assert.New(t)
assert.Equal(data[0].Name, "foo")
assert.Equal(data[1].Name, "bar")
assert.Equal(len(data), 2)
},
},
{
name: "match according to location and idc condition",
schedulerClusters: []model.SchedulerCluster{
Expand Down Expand Up @@ -596,6 +621,7 @@ func TestSchedulerCluster(t *testing.T) {
Scopes: map[string]any{
"idc": "IDC-1",
"location": "LOCATION-2",
"cidrs": []string{"128.168.1.0/24"},
},
SecurityGroup: model.SecurityGroup{
SecurityRules: []model.SecurityRule{
Expand All @@ -616,6 +642,7 @@ func TestSchedulerCluster(t *testing.T) {
Scopes: map[string]any{
"idc": "IDC-1",
"location": "LOCATION-1",
"cidrs": []string{"128.168.1.0/24"},
},
SecurityGroup: model.SecurityGroup{
SecurityRules: []model.SecurityRule{
Expand All @@ -636,6 +663,7 @@ func TestSchedulerCluster(t *testing.T) {
Scopes: map[string]any{
"idc": "IDC-1",
"location": "LOCATION-1|LOCATION-2",
"cidrs": []string{"128.168.1.0/24"},
},
Schedulers: []model.Scheduler{
{
Expand All @@ -649,6 +677,7 @@ func TestSchedulerCluster(t *testing.T) {
Scopes: map[string]any{
"idc": "IDC-1",
"location": "LOCATION-2",
"cidrs": []string{"128.168.1.0/24"},
},
Schedulers: []model.Scheduler{
{
Expand All @@ -663,6 +692,7 @@ func TestSchedulerCluster(t *testing.T) {
Scopes: map[string]any{
"idc": "IDC-1",
"location": "LOCATION-2",
"cidrs": []string{"128.168.1.0/24"},
},
SecurityGroup: model.SecurityGroup{
SecurityRules: []model.SecurityRule{
Expand All @@ -683,6 +713,7 @@ func TestSchedulerCluster(t *testing.T) {
Scopes: map[string]any{
"idc": "IDC-1",
"location": "LOCATION-2",
"cidrs": []string{"128.168.1.0/24"},
},
SecurityGroup: model.SecurityGroup{
SecurityRules: []model.SecurityRule{
Expand Down Expand Up @@ -720,7 +751,7 @@ func TestSchedulerCluster(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
searcher := New(pluginDir)
clusters, found := searcher.FindSchedulerClusters(context.Background(), tc.schedulerClusters, "foo", "127.0.0.1", tc.conditions)
clusters, found := searcher.FindSchedulerClusters(context.Background(), tc.schedulerClusters, "128.168.1.0", "foo", tc.conditions)
tc.expect(t, clusters, found)
})
}
Expand Down
2 changes: 1 addition & 1 deletion manager/searcher/testdata/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func main() {
os.Exit(1)
}

clusters, err := s.FindSchedulerClusters(context.Background(), []model.SchedulerCluster{}, "foo", "127.0.0.1", map[string]string{})
clusters, err := s.FindSchedulerClusters(context.Background(), []model.SchedulerCluster{}, "127.0.0.1", "foo", map[string]string{})
if err != nil {
fmt.Println("scheduler cluster not found")
os.Exit(1)
Expand Down
5 changes: 3 additions & 2 deletions manager/types/scheduler_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type SchedulerClusterClientConfig struct {
}

type SchedulerClusterScopes struct {
IDC string `yaml:"idc" mapstructure:"idc" json:"idc" binding:"omitempty"`
Location string `yaml:"location" mapstructure:"location" json:"location" binding:"omitempty"`
IDC string `yaml:"idc" mapstructure:"idc" json:"idc" binding:"omitempty"`
Location string `yaml:"location" mapstructure:"location" json:"location" binding:"omitempty"`
CIDRs []string `yaml:"cidrs" mapstructure:"cidrs" json:"cidrs" binding:"omitempty"`
}

0 comments on commit e0ce8d8

Please sign in to comment.