Skip to content

Commit

Permalink
feat: add SyncNetworkTopology and SyncProbes to scheduler client (#2114)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Feb 24, 2023
1 parent 1df0139 commit 92a19b5
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 12 deletions.
2 changes: 1 addition & 1 deletion client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
}
}

managerClient, err = managerclient.GetV1ByAddr(
managerClient, err = managerclient.GetV1ByNetAddrs(
context.Background(), opt.Scheduler.Manager.NetAddrs, grpc.WithTransportCredentials(grpcCredentials))
if err != nil {
return nil, err
Expand Down
8 changes: 8 additions & 0 deletions client/daemon/peer/peertask_dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ func (d *dummySchedulerClient) AnnounceTask(ctx context.Context, request *schedu
panic("should not call this function")
}

func (d *dummySchedulerClient) SyncProbes(ctx context.Context, req *schedulerv1.SyncProbesRequest, opts ...grpc.CallOption) (schedulerv1.Scheduler_SyncProbesClient, error) {
panic("should not call this function")
}

func (d *dummySchedulerClient) SyncNetworkTopology(ctx context.Context, req *schedulerv1.SyncNetworkTopologyRequest, opts ...grpc.CallOption) (schedulerv1.Scheduler_SyncNetworkTopologyClient, error) {
panic("should not call this function")
}

func (d *dummySchedulerClient) Close() error {
return nil
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/rpc/manager/client/client_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ import (
"d7y.io/dragonfly/v2/pkg/reachable"
)

// GetV1 returns v1 version of the manager client.
func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, error) {
// GetV1ByAddr returns v1 version of the manager client by address.
func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V1, error) {
conn, err := grpc.DialContext(
ctx,
target,
Expand Down Expand Up @@ -73,13 +73,13 @@ func GetV1(ctx context.Context, target string, opts ...grpc.DialOption) (V1, err
}, nil
}

// GetV1ByAddr returns v1 version of the manager client with addresses.
func GetV1ByAddr(ctx context.Context, netAddrs []dfnet.NetAddr, opts ...grpc.DialOption) (V1, error) {
// GetV1ByNetAddrs returns v1 version of the manager client with net addresses.
func GetV1ByNetAddrs(ctx context.Context, netAddrs []dfnet.NetAddr, opts ...grpc.DialOption) (V1, error) {
for _, netAddr := range netAddrs {
ipReachable := reachable.New(&reachable.Config{Address: netAddr.Addr})
if err := ipReachable.Check(); err == nil {
logger.Infof("use %s address for manager grpc client", netAddr.Addr)
return GetV1(ctx, netAddr.Addr, opts...)
return GetV1ByAddr(ctx, netAddr.Addr, opts...)
}
logger.Warnf("%s manager address can not reachable", netAddr.Addr)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/rpc/manager/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ import (
"d7y.io/dragonfly/v2/pkg/reachable"
)

// GetV2 returns v2 version of the manager client.
func GetV2(ctx context.Context, target string, opts ...grpc.DialOption) (V2, error) {
// GetV2ByAddr returns v2 version of the manager client by address.
func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V2, error) {
conn, err := grpc.DialContext(
ctx,
target,
Expand Down Expand Up @@ -73,13 +73,13 @@ func GetV2(ctx context.Context, target string, opts ...grpc.DialOption) (V2, err
}, nil
}

// GetV2ByAddr returns v2 version of the manager client with addresses.
func GetV2ByAddr(ctx context.Context, netAddrs []dfnet.NetAddr, opts ...grpc.DialOption) (V2, error) {
// GetV2ByNetAddrs returns v2 version of the manager client with net addresses.
func GetV2ByNetAddrs(ctx context.Context, netAddrs []dfnet.NetAddr, opts ...grpc.DialOption) (V2, error) {
for _, netAddr := range netAddrs {
ipReachable := reachable.New(&reachable.Config{Address: netAddr.Addr})
if err := ipReachable.Check(); err == nil {
logger.Infof("use %s address for manager grpc client", netAddr.Addr)
return GetV2(ctx, netAddr.Addr, opts...)
return GetV2ByAddr(ctx, netAddr.Addr, opts...)
}
logger.Warnf("%s manager address can not reachable", netAddr.Addr)
}
Expand Down
64 changes: 64 additions & 0 deletions pkg/rpc/scheduler/client/client_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,42 @@ func GetV1(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt
}, nil
}

// GetV1ByAddr returns v2 version of the scheduler client by address.
func GetV1ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V1, error) {
conn, err := grpc.DialContext(
ctx,
target,
append([]grpc.DialOption{
grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
rpc.ConvertErrorUnaryClientInterceptor,
otelgrpc.UnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
grpc_retry.WithMax(maxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffLinear(backoffWaitBetween)),
),
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
rpc.ConvertErrorStreamClientInterceptor,
otelgrpc.StreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)),
}, opts...)...,
)
if err != nil {
return nil, err
}

return &v1{
SchedulerClient: schedulerv1.NewSchedulerClient(conn),
ClientConn: conn,
dialOptions: opts,
}, nil
}

// V1 is the interface for v1 version of the grpc client.
type V1 interface {
// RegisterPeerTask registers a peer into task.
Expand All @@ -112,6 +148,12 @@ type V1 interface {
// LeaveHost releases host in scheduler.
LeaveHost(context.Context, *schedulerv1.LeaveHostRequest, ...grpc.CallOption) error

// SyncProbes sync probes of the host.
SyncProbes(context.Context, *schedulerv1.SyncProbesRequest, ...grpc.CallOption) (schedulerv1.Scheduler_SyncProbesClient, error)

// SyncNetworkTopology sync network topology of the hosts.
SyncNetworkTopology(context.Context, *schedulerv1.SyncNetworkTopologyRequest, ...grpc.CallOption) (schedulerv1.Scheduler_SyncNetworkTopologyClient, error)

// Close tears down the ClientConn and all underlying connections.
Close() error
}
Expand Down Expand Up @@ -266,3 +308,25 @@ func (v *v1) LeaveHost(ctx context.Context, req *schedulerv1.LeaveHostRequest, o

return eg.Wait()
}

// SyncProbes sync probes of the host.
func (v *v1) SyncProbes(ctx context.Context, req *schedulerv1.SyncProbesRequest, opts ...grpc.CallOption) (schedulerv1.Scheduler_SyncProbesClient, error) {
stream, err := v.SchedulerClient.SyncProbes(ctx, opts...)
if err != nil {
return nil, err
}

// Send begin of piece.
return stream, stream.Send(req)
}

// SyncNetworkTopology sync network topology of the hosts.
func (v *v1) SyncNetworkTopology(ctx context.Context, req *schedulerv1.SyncNetworkTopologyRequest, opts ...grpc.CallOption) (schedulerv1.Scheduler_SyncNetworkTopologyClient, error) {
stream, err := v.SchedulerClient.SyncNetworkTopology(ctx, opts...)
if err != nil {
return nil, err
}

// Send begin of piece.
return stream, stream.Send(req)
}
64 changes: 64 additions & 0 deletions pkg/rpc/scheduler/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,42 @@ func GetV2(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOpt
}, nil
}

// GetV2ByAddr returns v2 version of the scheduler client by address.
func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V2, error) {
conn, err := grpc.DialContext(
ctx,
target,
append([]grpc.DialOption{
grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
rpc.ConvertErrorUnaryClientInterceptor,
otelgrpc.UnaryClientInterceptor(),
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
grpc_retry.WithMax(maxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffLinear(backoffWaitBetween)),
),
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
rpc.ConvertErrorStreamClientInterceptor,
otelgrpc.StreamClientInterceptor(),
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)),
}, opts...)...,
)
if err != nil {
return nil, err
}

return &v2{
SchedulerClient: schedulerv2.NewSchedulerClient(conn),
ClientConn: conn,
dialOptions: opts,
}, nil
}

// V2 is the interface for v1 version of the grpc client.
type V2 interface {
// AnnouncePeer announces peer to scheduler.
Expand All @@ -109,6 +145,12 @@ type V2 interface {
// LeaveHost releases host in scheduler.
LeaveHost(context.Context, *schedulerv2.LeaveHostRequest, ...grpc.CallOption) error

// SyncProbes sync probes of the host.
SyncProbes(context.Context, *schedulerv2.SyncProbesRequest, ...grpc.CallOption) (schedulerv2.Scheduler_SyncProbesClient, error)

// SyncNetworkTopology sync network topology of the hosts.
SyncNetworkTopology(context.Context, *schedulerv2.SyncNetworkTopologyRequest, ...grpc.CallOption) (schedulerv2.Scheduler_SyncNetworkTopologyClient, error)

// Close tears down the ClientConn and all underlying connections.
Close() error
}
Expand Down Expand Up @@ -239,3 +281,25 @@ func (v *v2) LeaveHost(ctx context.Context, req *schedulerv2.LeaveHostRequest, o

return eg.Wait()
}

// SyncProbes sync probes of the host.
func (v *v2) SyncProbes(ctx context.Context, req *schedulerv2.SyncProbesRequest, opts ...grpc.CallOption) (schedulerv2.Scheduler_SyncProbesClient, error) {
stream, err := v.SchedulerClient.SyncProbes(ctx, opts...)
if err != nil {
return nil, err
}

// Send begin of piece.
return stream, stream.Send(req)
}

// SyncNetworkTopology sync network topology of the hosts.
func (v *v2) SyncNetworkTopology(ctx context.Context, req *schedulerv2.SyncNetworkTopologyRequest, opts ...grpc.CallOption) (schedulerv2.Scheduler_SyncNetworkTopologyClient, error) {
stream, err := v.SchedulerClient.SyncNetworkTopology(ctx, opts...)
if err != nil {
return nil, err
}

// Send begin of piece.
return stream, stream.Send(req)
}
40 changes: 40 additions & 0 deletions pkg/rpc/scheduler/client/mocks/client_v1_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 40 additions & 0 deletions pkg/rpc/scheduler/client/mocks/client_v2_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
}

// Initialize manager client.
managerClient, err := managerclient.GetV2(ctx, cfg.Manager.Addr, managerDialOptions...)
managerClient, err := managerclient.GetV2ByAddr(ctx, cfg.Manager.Addr, managerDialOptions...)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 92a19b5

Please sign in to comment.