diff --git a/admin-cli/cmd/init.go b/admin-cli/cmd/init.go index f910d67eb7..50fcb28395 100644 --- a/admin-cli/cmd/init.go +++ b/admin-cli/cmd/init.go @@ -34,5 +34,5 @@ var pegasusClient *executor.Client func Init(metaList []string) { globalMetaList = metaList - pegasusClient = executor.NewClient(os.Stdout, globalMetaList) + pegasusClient, _ = executor.NewClient(os.Stdout, globalMetaList, true) } diff --git a/admin-cli/executor/client.go b/admin-cli/executor/client.go index b41c7189f0..62998177a2 100644 --- a/admin-cli/executor/client.go +++ b/admin-cli/executor/client.go @@ -23,6 +23,7 @@ import ( "fmt" "io" "os" + "strings" "github.com/apache/incubator-pegasus/admin-cli/client" "github.com/apache/incubator-pegasus/admin-cli/util" @@ -45,14 +46,18 @@ type Client struct { } // NewClient creates a client for accessing Pegasus cluster for use of admin-cli. -func NewClient(writer io.Writer, metaAddrs []string) *Client { +// When listing nodes fails, willExit == true means call os.Exit(). +func NewClient(writer io.Writer, metaAddrs []string, willExit bool) (*Client, error) { meta := client.NewRPCBasedMeta(metaAddrs) // TODO(wutao): initialize replica-nodes lazily nodes, err := meta.ListNodes() if err != nil { - fmt.Fprintf(writer, "fatal: failed to list nodes [%s]\n", err) - os.Exit(1) + fmt.Printf("Error: failed to list nodes [%s]\n", err) + if willExit { + os.Exit(1) + } + return nil, fmt.Errorf("failed to list nodes [%s]", err) } var replicaAddrs []string @@ -65,5 +70,26 @@ func NewClient(writer io.Writer, metaAddrs []string) *Client { Meta: meta, Nodes: util.NewPegasusNodeManager(metaAddrs, replicaAddrs), Perf: aggregate.NewPerfClient(metaAddrs), + }, nil +} + +func CloseClient(client *Client) error { + var errorStrings []string + err := client.Meta.Close() + if err != nil { + fmt.Printf("Error: failed to close meta session [%s].\n", err) + errorStrings = append(errorStrings, err.Error()) + } + + client.Perf.Close() + + err = client.Nodes.CloseAllNodes() + if err != nil { + fmt.Printf("Error: failed to close nodes session [%s].\n", err) + errorStrings = append(errorStrings, err.Error()) + } + if len(errorStrings) != 0 { + return fmt.Errorf("%s", strings.Join(errorStrings, "\n")) } + return nil } diff --git a/admin-cli/executor/disk_info.go b/admin-cli/executor/disk_info.go index a7793bb4d1..83c49511d9 100644 --- a/admin-cli/executor/disk_info.go +++ b/admin-cli/executor/disk_info.go @@ -45,7 +45,7 @@ func QueryDiskInfo(client *Client, infoType DiskInfoType, replicaServer string, } func GetDiskInfo(client *Client, infoType DiskInfoType, replicaServer string, tableName string, diskTag string, print bool) ([]interface{}, error) { - resp, err := sendQueryDiskInfoRequest(client, replicaServer, tableName) + resp, err := SendQueryDiskInfoRequest(client, replicaServer, tableName) if err != nil { return nil, err } @@ -60,7 +60,7 @@ func GetDiskInfo(client *Client, infoType DiskInfoType, replicaServer string, ta } } -func sendQueryDiskInfoRequest(client *Client, replicaServer string, tableName string) (*radmin.QueryDiskInfoResponse, error) { +func SendQueryDiskInfoRequest(client *Client, replicaServer string, tableName string) (*radmin.QueryDiskInfoResponse, error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -88,7 +88,7 @@ func QueryAllNodesDiskInfo(client *Client, tableName string) (map[string]*radmin } for _, nodeInfo := range nodeInfos { address := nodeInfo.GetAddress().GetAddress() - resp, err := sendQueryDiskInfoRequest(client, address, tableName) + resp, err := SendQueryDiskInfoRequest(client, address, tableName) if err != nil { return respMap, err } diff --git a/admin-cli/executor/toolkits/tablemigrator/switcher.go b/admin-cli/executor/toolkits/tablemigrator/switcher.go index 76c223a98d..41f2be6d05 100644 --- a/admin-cli/executor/toolkits/tablemigrator/switcher.go +++ b/admin-cli/executor/toolkits/tablemigrator/switcher.go @@ -59,7 +59,8 @@ func SwitchMetaAddrs(client *executor.Client, zkAddr string, zkRoot string, tabl originMeta := client.Meta targetAddrList := strings.Split(targetAddrs, ",") - targetMeta := executor.NewClient(os.Stdout, targetAddrList).Meta + pegasusClient, _ := executor.NewClient(os.Stdout, targetAddrList, true) + targetMeta := pegasusClient.Meta env := map[string]string{ "replica.deny_client_request": "reconfig*all", } diff --git a/admin-cli/util/pegasus_node.go b/admin-cli/util/pegasus_node.go index 5f7320820c..e0554008f3 100644 --- a/admin-cli/util/pegasus_node.go +++ b/admin-cli/util/pegasus_node.go @@ -84,6 +84,13 @@ func (n *PegasusNode) RPCAddress() *base.RPCAddress { return base.NewRPCAddress(n.IP, n.Port) } +func (n *PegasusNode) Close() error { + if n.session != nil { + return n.session.Close() + } + return nil +} + // NewNodeFromTCPAddr creates a node from tcp address. // NOTE: // - Will not initialize TCP connection unless needed. @@ -211,3 +218,17 @@ func (m *PegasusNodeManager) GetPerfSession(addr string, ntype session.NodeType) return aggregate.WrapPerf(addr, node.session) } + +func (m *PegasusNodeManager) CloseAllNodes() error { + var errorStrings []string + for _, n := range m.nodes { + err := n.Close() + if err != nil { + errorStrings = append(errorStrings, err.Error()) + } + } + if len(errorStrings) != 0 { + return fmt.Errorf("%s", strings.Join(errorStrings, "\n")) + } + return nil +}