Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: support close admin-cli client #2162

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion admin-cli/cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ var pegasusClient *executor.Client
func Init(metaList []string) {
globalMetaList = metaList

pegasusClient = executor.NewClient(os.Stdout, globalMetaList)
pegasusClient, _ = executor.NewClient(os.Stdout, globalMetaList, true)
}
30 changes: 27 additions & 3 deletions admin-cli/executor/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"io"
"os"
"strings"

"github.com/apache/incubator-pegasus/admin-cli/client"
"github.com/apache/incubator-pegasus/admin-cli/util"
Expand All @@ -45,14 +46,18 @@ type Client struct {
}

// NewClient creates a client for accessing Pegasus cluster for use of admin-cli.
func NewClient(writer io.Writer, metaAddrs []string) *Client {
// When listing nodes fails, willExit == true means call os.Exit().
func NewClient(writer io.Writer, metaAddrs []string, willExit bool) (*Client, error) {
ruojieranyishen marked this conversation as resolved.
Show resolved Hide resolved
meta := client.NewRPCBasedMeta(metaAddrs)

// TODO(wutao): initialize replica-nodes lazily
nodes, err := meta.ListNodes()
if err != nil {
fmt.Fprintf(writer, "fatal: failed to list nodes [%s]\n", err)
os.Exit(1)
fmt.Printf("Error: failed to list nodes [%s]\n", err)
if willExit {
os.Exit(1)
}
return nil, fmt.Errorf("failed to list nodes [%s]", err)
}

var replicaAddrs []string
Expand All @@ -65,5 +70,24 @@ func NewClient(writer io.Writer, metaAddrs []string) *Client {
Meta: meta,
Nodes: util.NewPegasusNodeManager(metaAddrs, replicaAddrs),
Perf: aggregate.NewPerfClient(metaAddrs),
}, nil
}

func CloseClient(writer io.Writer, client *Client) error {
acelyc111 marked this conversation as resolved.
Show resolved Hide resolved
var errorStrings []string
err := client.Meta.Close()
if err != nil {
fmt.Fprintf(writer, "fatal: failed to close meta session [%s]\n", err)
errorStrings = append(errorStrings, err.Error())
ruojieranyishen marked this conversation as resolved.
Show resolved Hide resolved
}

client.Perf.Close()

err = client.Nodes.CloseAllNodes()
if err != nil {
fmt.Fprintf(writer, "fatal: failed to close nodes session [%s]\n", err)
errorStrings = append(errorStrings, err.Error())
}

return fmt.Errorf("%s", strings.Join(errorStrings, "\n"))
ruojieranyishen marked this conversation as resolved.
Show resolved Hide resolved
}
6 changes: 3 additions & 3 deletions admin-cli/executor/disk_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func QueryDiskInfo(client *Client, infoType DiskInfoType, replicaServer string,
}

func GetDiskInfo(client *Client, infoType DiskInfoType, replicaServer string, tableName string, diskTag string, print bool) ([]interface{}, error) {
resp, err := sendQueryDiskInfoRequest(client, replicaServer, tableName)
resp, err := SendQueryDiskInfoRequest(client, replicaServer, tableName)
if err != nil {
return nil, err
}
Expand All @@ -60,7 +60,7 @@ func GetDiskInfo(client *Client, infoType DiskInfoType, replicaServer string, ta
}
}

func sendQueryDiskInfoRequest(client *Client, replicaServer string, tableName string) (*radmin.QueryDiskInfoResponse, error) {
func SendQueryDiskInfoRequest(client *Client, replicaServer string, tableName string) (*radmin.QueryDiskInfoResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

Expand Down Expand Up @@ -88,7 +88,7 @@ func QueryAllNodesDiskInfo(client *Client, tableName string) (map[string]*radmin
}
for _, nodeInfo := range nodeInfos {
address := nodeInfo.GetAddress().GetAddress()
resp, err := sendQueryDiskInfoRequest(client, address, tableName)
resp, err := SendQueryDiskInfoRequest(client, address, tableName)
if err != nil {
return respMap, err
}
Expand Down
3 changes: 2 additions & 1 deletion admin-cli/executor/toolkits/tablemigrator/switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ func SwitchMetaAddrs(client *executor.Client, zkAddr string, zkRoot string, tabl

originMeta := client.Meta
targetAddrList := strings.Split(targetAddrs, ",")
targetMeta := executor.NewClient(os.Stdout, targetAddrList).Meta
pegasusClient, _ := executor.NewClient(os.Stdout, targetAddrList, true)
targetMeta := pegasusClient.Meta
env := map[string]string{
"replica.deny_client_request": "reconfig*all",
}
Expand Down
21 changes: 21 additions & 0 deletions admin-cli/util/pegasus_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ func (n *PegasusNode) RPCAddress() *base.RPCAddress {
return base.NewRPCAddress(n.IP, n.Port)
}

func (n *PegasusNode) Close() error {
if n.session != nil {
return n.session.Close()
}
return nil
}

// NewNodeFromTCPAddr creates a node from tcp address.
// NOTE:
// - Will not initialize TCP connection unless needed.
Expand Down Expand Up @@ -211,3 +218,17 @@ func (m *PegasusNodeManager) GetPerfSession(addr string, ntype session.NodeType)

return aggregate.WrapPerf(addr, node.session)
}

func (m *PegasusNodeManager) CloseAllNodes() error {
var errorStrings []string
for _, n := range m.nodes {
err := n.Close()
if err != nil {
errorStrings = append(errorStrings, err.Error())
}
}
if len(errorStrings) != 0 {
return fmt.Errorf("%s", strings.Join(errorStrings, "\n"))
}
return nil
}
Loading