Skip to content

Commit

Permalink
fixed anzi for ioutil.Discard (#72)
Browse files Browse the repository at this point in the history
* add anzi rdb callback ignore receive for io.discard

* fixed of expire at data

* add buffer of end of rdb channel

* change import path

* add overlord version of all sub commands

* add version str
  • Loading branch information
wayslog authored Jun 18, 2019
1 parent d8063cd commit 3b67fd6
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 35 deletions.
55 changes: 48 additions & 7 deletions anzi/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package anzi
import (
"bufio"
"fmt"
"io"
"io/ioutil"
"net"
"strconv"
"sync"
"time"

"overlord/pkg/log"
)
Expand Down Expand Up @@ -51,7 +55,8 @@ type RDBCallback interface {
// NewProtocolCallbacker convert them as callback
func NewProtocolCallbacker(addr string) *ProtocolCallbacker {
p := &ProtocolCallbacker{
addr: addr,
addr: addr,
endOfRDB: make(chan struct{}, 1),
}

conn, err := net.Dial("tcp", addr)
Expand All @@ -60,6 +65,8 @@ func NewProtocolCallbacker(addr string) *ProtocolCallbacker {
} else {
p.conn = conn
p.bw = bufio.NewWriter(conn)
p.br = bufio.NewReader(conn)
go p.ignoreRecv()
}

return p
Expand All @@ -69,8 +76,33 @@ func NewProtocolCallbacker(addr string) *ProtocolCallbacker {
// protocol data into downstream.
type ProtocolCallbacker struct {
addr string

lock sync.RWMutex
conn net.Conn
bw *bufio.Writer
br *bufio.Reader

endOfRDB chan struct{}
}

func (r *ProtocolCallbacker) ignoreRecv() {
for {
select {
case <-r.endOfRDB:
return
default:
}

r.lock.RLock()
size, err := io.Copy(ioutil.Discard, r.br)
r.lock.RUnlock()
if size == 0 {
time.Sleep(time.Second)
}
if err != nil {
log.Warnf("fail to discard reader due %s", err)
}
}
}

// SelectDB impl Callback
Expand All @@ -95,6 +127,7 @@ func (r *ProtocolCallbacker) ResizeDB(size, esize uint64) {

// EndOfRDB impl Callback
func (r *ProtocolCallbacker) EndOfRDB() {
r.endOfRDB <- struct{}{}
log.Infof("EndOfRDB...")
_ = r.bw.Flush()
}
Expand Down Expand Up @@ -144,23 +177,31 @@ func (r *ProtocolCallbacker) ExpireAt(key []byte, expiry uint64) {
return
}

r.handleErr(writePlainCmd(r.bw, BytesExpireAt, key, []byte(fmt.Sprintf("%d", expiry)), 2))
r.handleErr(writePlainCmd(r.bw, BytesExpireAt, key, []byte(fmt.Sprintf("%d", expiry)), 3))
}

func (r *ProtocolCallbacker) handleErr(err error) {
if err == nil {
return
}

_ = r.conn.Close()
r.conn, err = net.Dial("tcp", r.addr)
var conn net.Conn
conn, err = net.Dial("tcp", r.addr)
if err != nil {
log.Errorf("fail to reconnect due %s", err)
return
}

_ = r.conn.Close()
r.lock.Lock()
r.conn = conn
r.bw = bufio.NewWriter(r.conn)
r.br = bufio.NewReader(r.conn)
r.lock.Unlock()
}

func write4ArgsCmd(w *bufio.Writer, cmd, key, field, val []byte) (err error) {
_ = writeBulkCount(w, 4)
_ = writeArrayCount(w, 4)
_ = writeToBulk(w, cmd)
_ = writeToBulk(w, key)
_ = writeToBulk(w, field)
Expand All @@ -173,14 +214,14 @@ func writePlainCmd(w *bufio.Writer, cmd, key, val []byte, size ...int) (err erro
if len(size) == 1 {
count = size[0]
}
_ = writeBulkCount(w, count)
_ = writeArrayCount(w, count)
_ = writeToBulk(w, cmd)
_ = writeToBulk(w, key)
err = writeToBulk(w, val)
return
}

func writeBulkCount(w *bufio.Writer, size int) (err error) {
func writeArrayCount(w *bufio.Writer, size int) (err error) {
_, err = w.WriteString(fmt.Sprintf("*%d\r\n", size))
return
}
Expand Down
10 changes: 7 additions & 3 deletions cmd/anzi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@ package main

import (
"flag"
"overlord/anzi"

"overlord/pkg/log"

"github.com/BurntSushi/toml"

"overlord/anzi"
"overlord/pkg/log"
"overlord/version"
)

var confPath string

func main() {
flag.StringVar(&confPath, "conf", "anzi.toml", "anzi config file")
flag.Parse()
if version.ShowVersion() {
return
}

conf := new(anzi.Config)
_, err := toml.DecodeFile(confPath, &conf)
Expand Down
9 changes: 7 additions & 2 deletions cmd/apicli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
"net/http"
"strings"

"github.com/pkg/errors"

"overlord/pkg/log"
"overlord/platform/api/model"

"github.com/pkg/errors"
"overlord/version"
)

func main() {
Expand All @@ -21,6 +22,10 @@ func main() {
flag.StringVar(&appid, "appid", "", "appid name")
flag.StringVar(&addr, "addr", "", "addr of node to restart")
flag.Parse()
if version.ShowVersion() {
return
}

log.Init(nil)
var err error
switch {
Expand Down
10 changes: 8 additions & 2 deletions cmd/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package main
import (
"flag"

"github.com/BurntSushi/toml"

"overlord/pkg/log"
"overlord/platform/api/model"
"overlord/platform/api/server"
"overlord/platform/api/service"

"github.com/BurntSushi/toml"
"overlord/version"
)

var (
Expand All @@ -18,6 +19,11 @@ var (
func main() {
flag.StringVar(&confPath, "conf", "conf.toml", "scheduler conf")
flag.Parse()

if version.ShowVersion() {
return
}

conf := new(model.ServerConfig)
_, err := toml.DecodeFile(confPath, &conf)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions cmd/balancer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"overlord/pkg/etcd"
"overlord/pkg/log"
"overlord/platform/job/balance"
"overlord/version"
)

var (
Expand All @@ -18,6 +19,10 @@ func main() {
flag.StringVar(&cluster, "cluster", "", "cluster name")
flag.StringVar(&db, "db", "", "etcd dsn")
flag.Parse()
if version.ShowVersion() {
return
}

log.InitHandle(log.NewStdHandler())
var etcdURL string
if strings.HasPrefix(db, "http://") {
Expand Down
7 changes: 7 additions & 0 deletions cmd/executor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@ package main

import (
"context"
"flag"

"overlord/pkg/log"
"overlord/platform/mesos"
"overlord/version"
)

func main() {
flag.Parse()
if version.ShowVersion() {
return
}

ec := mesos.New()
log.InitHandle(log.NewStdHandler())
ec.Run(context.Background())
Expand Down
34 changes: 14 additions & 20 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,17 @@ import (
"overlord/pkg/prom"
"overlord/proxy"
"overlord/proxy/slowlog"
)

const (
// VERSION version
VERSION = "1.8.0"
"overlord/version"
)

var (
version bool
check bool
stat string
metrics bool
confFile string
clusterConfFile string
reload bool
slowlogFile string
check bool
stat string
metrics bool
confFile string
clusterConfFile string
reload bool
slowlogFile string
slowlogSlowerThan int
)

Expand All @@ -52,7 +47,6 @@ var usage = func() {
func init() {
flag.Usage = usage
flag.BoolVar(&check, "t", false, "conf file check")
flag.BoolVar(&version, "v", false, "print version.")
flag.StringVar(&stat, "stat", "", "stat listen addr. high priority than conf.stat.")
flag.BoolVar(&metrics, "metrics", false, "proxy support prometheus metrics and reuse stat port.")
flag.StringVar(&confFile, "conf", "", "conf file of proxy itself.")
Expand All @@ -64,10 +58,10 @@ func init() {

func main() {
flag.Parse()
if version {
fmt.Printf("overlord version %s\n", VERSION)
if version.ShowVersion() {
os.Exit(0)
}

if check {
parseConfig()
os.Exit(0)
Expand Down Expand Up @@ -101,7 +95,7 @@ func main() {
prom.On = false
}
}
prom.VersionState(VERSION)
prom.VersionState(version.Str())
// hanlde signal
signalHandler()
}
Expand Down Expand Up @@ -143,12 +137,12 @@ func signalHandler() {
var ch = make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
for {
log.Infof("overlord proxy version[%s] start serving", VERSION)
log.Infof("overlord proxy version[%s] start serving", version.Str())
si := <-ch
log.Infof("overlord proxy version[%s] signal(%s) stop the process", VERSION, si.String())
log.Infof("overlord proxy version[%s] signal(%s) stop the process", version.Str(), si.String())
switch si {
case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
log.Infof("overlord proxy version[%s] exited", VERSION)
log.Infof("overlord proxy version[%s] exited", version.Str())
return
case syscall.SIGHUP:
default:
Expand Down
8 changes: 7 additions & 1 deletion cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package main

import (
"flag"
"time"

"overlord/pkg/etcd"
"overlord/pkg/log"
"overlord/platform/mesos"
"time"
"overlord/version"

"github.com/BurntSushi/toml"
)
Expand All @@ -24,6 +26,10 @@ var defConf = &mesos.Config{
func main() {
flag.StringVar(&confPath, "conf", "", "scheduler conf")
flag.Parse()
if version.ShowVersion() {
return
}

conf := new(mesos.Config)
if confPath != "" {
_, err := toml.DecodeFile(confPath, &conf)
Expand Down
35 changes: 35 additions & 0 deletions version/version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package version

import (
"flag"
"fmt"
"os"
)

// Define overlord version consts
const (
OverlordMajor = 1
OverlordMinor = 8
OverlordPatch = 1
)

var showVersion bool
var vstr string

func init() {
vstr = fmt.Sprintf("%d.%d.%d", OverlordMajor, OverlordMinor, OverlordPatch)
flag.BoolVar(&showVersion, "version", false, "show version and exit.")
}

// ShowVersion print version if -version flag is seted and return true
func ShowVersion() bool {
if showVersion {
fmt.Fprintln(os.Stdout, vstr)
}
return showVersion
}

// Str is the formatted version string
func Str() string {
return vstr
}

0 comments on commit 3b67fd6

Please sign in to comment.