Skip to content

Commit

Permalink
Merge pull request #127 from Terry-Mao/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
Terry-Mao authored Aug 17, 2016
2 parents b1d6fd2 + fb900d6 commit ec6ea9e
Show file tree
Hide file tree
Showing 24 changed files with 647 additions and 373 deletions.
28 changes: 10 additions & 18 deletions comet/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"goim/libs/define"
"goim/libs/proto"
"goim/libs/time"
"sync"
"sync/atomic"
)
Expand All @@ -24,11 +23,10 @@ type Bucket struct {
rooms map[int32]*Room // bucket room channels
routines []chan *proto.BoardcastRoomArg
routinesNum int64
roptions RoomOptions
}

// NewBucket new a bucket struct. store the key with im channel.
func NewBucket(boptions BucketOptions, roptions RoomOptions) (b *Bucket) {
func NewBucket(boptions BucketOptions) (b *Bucket) {
b = new(Bucket)
b.chs = make(map[string]*Channel, boptions.ChannelSize)
b.boptions = boptions
Expand All @@ -37,7 +35,6 @@ func NewBucket(boptions BucketOptions, roptions RoomOptions) (b *Bucket) {
b.rooms = make(map[int32]*Room, boptions.RoomSize)
b.routines = make([]chan *proto.BoardcastRoomArg, boptions.RoutineAmount)
b.routinesNum = int64(0)
b.roptions = roptions
for i := int64(0); i < boptions.RoutineAmount; i++ {
c := make(chan *proto.BoardcastRoomArg, boptions.RoutineSize)
b.routines[i] = c
Expand All @@ -47,7 +44,7 @@ func NewBucket(boptions BucketOptions, roptions RoomOptions) (b *Bucket) {
}

// Put put a channel according with sub key.
func (b *Bucket) Put(key string, ch *Channel, tr *time.Timer) (err error) {
func (b *Bucket) Put(key string, ch *Channel) (err error) {
var (
room *Room
ok bool
Expand All @@ -56,7 +53,7 @@ func (b *Bucket) Put(key string, ch *Channel, tr *time.Timer) (err error) {
b.chs[key] = ch
if ch.RoomId != define.NoRoom {
if room, ok = b.rooms[ch.RoomId]; !ok {
room = NewRoom(ch.RoomId, tr, b.roptions)
room = NewRoom(ch.RoomId)
b.rooms[ch.RoomId] = room
}
}
Expand All @@ -71,7 +68,6 @@ func (b *Bucket) Put(key string, ch *Channel, tr *time.Timer) (err error) {
func (b *Bucket) Del(key string) {
var (
ok bool
drop bool
ch *Channel
room *Room
)
Expand All @@ -83,11 +79,9 @@ func (b *Bucket) Del(key string) {
}
}
b.cLock.Unlock()
if room != nil {
if room != nil && room.Del(ch) {
// if empty room, must delete from bucket
if drop = room.Del(ch); drop {
b.DelRoom(ch.RoomId)
}
b.DelRoom(ch.RoomId)
}
}

Expand Down Expand Up @@ -147,7 +141,7 @@ func (b *Bucket) Rooms() (res map[int32]struct{}) {
res = make(map[int32]struct{})
b.cLock.RLock()
for roomId, room = range b.rooms {
if room.Online() > 0 {
if room.Online > 0 {
res[roomId] = struct{}{}
}
}
Expand All @@ -157,16 +151,14 @@ func (b *Bucket) Rooms() (res map[int32]struct{}) {

// roomproc
func (b *Bucket) roomproc(c chan *proto.BoardcastRoomArg) {
var (
arg *proto.BoardcastRoomArg
room *Room
)
for {
var (
arg *proto.BoardcastRoomArg
room *Room
)
arg = <-c
if room = b.Room(arg.RoomId); room != nil {
room.Push(&arg.P)
}
arg = nil
room = nil
}
}
9 changes: 4 additions & 5 deletions comet/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package main
import (
"goim/libs/bufio"
"goim/libs/proto"

log "github.com/thinkboy/log4go"
)

// Channel used by message pusher send msg to write goroutine.
Expand All @@ -14,6 +12,8 @@ type Channel struct {
signal chan *proto.Proto
Writer bufio.Writer
Reader bufio.Reader
Next *Channel
Prev *Channel
}

func NewChannel(cli, svr int, rid int32) *Channel {
Expand All @@ -34,9 +34,8 @@ func (c *Channel) Push(p *proto.Proto) (err error) {
}

// Ready check the channel ready or close?
func (c *Channel) Ready() (p *proto.Proto) {
p = <-c.signal
return
func (c *Channel) Ready() *proto.Proto {
return <-c.signal
}

// Signal send signal to the channel, protocol ready.
Expand Down
30 changes: 16 additions & 14 deletions comet/comet-example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,20 @@ server.id 1
# debug 1
debug 1

[slow]
# push slow log
log ./comet-slow.log
# slow push time
time 1s
# Whitelist keys.
#
# Examples:
#
# white.list 123_,321_
white.list 123_,321_

# Whitelist log file.
#
# Examples:
#
# white.log ./white_list.log
white.log /tmp/white_list.log


[tcp]
# By default comet listens for connections from all the network interfaces
Expand Down Expand Up @@ -275,18 +284,11 @@ routine.amount 128
#
routine.size 20

[room]
# room's channel cache num
#
# Examples:
#
# channel 1024
channel 128

[logic]
# logic service rpc address
# set(logic1, logic2)
#
# Examples:
#
# rpc.addrs tcp@localhost:7170
# rpc.addrs tcp@localhost:7170,tcp@localhost:7170
rpc.addrs tcp@localhost:7170
9 changes: 3 additions & 6 deletions comet/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ type Config struct {
StatBind []string `goconf:"base:stat.bind:,"`
ServerId int32 `goconf:"base:server.id"`
Debug bool `goconf:"base:debug"`
// slow log
SlowLog string `goconf:"slow:log"`
SlowTime time.Duration `goconf:"slow:time,time"`
Whitelist []string `goconf:"base:white.list:,"`
WhiteLog string `goconf:"base:white.log"`
// tcp
TCPBind []string `goconf:"tcp:bind:,"`
TCPSndbuf int `goconf:"tcp:sndbuf:memory"`
Expand Down Expand Up @@ -81,12 +80,10 @@ type Config struct {
BucketRoom int `goconf:"bucket:room"`
RoutineAmount int64 `goconf:"bucket:routine.amount"`
RoutineSize int `goconf:"bucket:routine.size"`
// room
RoomChannel int `goconf:"room:channel"`
// push
RPCPushAddrs []string `goconf:"push:rpc.addrs:,"`
// logic
LogicAddr string `goconf:"logic:rpc.addrs"`
LogicAddrs []string `goconf:"logic:rpc.addrs:,"`
}

func NewConfig() *Config {
Expand Down
87 changes: 30 additions & 57 deletions comet/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package main

import (
inet "goim/libs/net"
"goim/libs/net/xrpc"
"goim/libs/proto"
"net/rpc"
"time"

log "github.com/thinkboy/log4go"
)

var (
logicRpcClient *rpc.Client
logicRpcClient *xrpc.Clients
logicRpcQuit = make(chan struct{}, 1)

logicService = "RPC"
Expand All @@ -19,62 +19,37 @@ var (
logicServiceDisconnect = "RPC.Disconnect"
)

func InitLogicRpc(addrs string) (err error) {
var network, addr string
if network, addr, err = inet.ParseNetwork(addrs); err != nil {
log.Error("inet.ParseNetwork() error(%v)", err)
return
}
logicRpcClient, err = rpc.Dial(network, addr)
if err != nil {
log.Error("rpc.Dial(\"%s\", \"%s\") error(%s)", network, addr, err)
}
go Reconnect(&logicRpcClient, logicRpcQuit, network, addr)
log.Debug("logic rpc addr %s:%s connected", network, addr)
return
}

// Reconnect for ping rpc server and reconnect with it when it's crash.
func Reconnect(dst **rpc.Client, quit chan struct{}, network, address string) {
func InitLogicRpc(addrs []string) (err error) {
var (
tmp *rpc.Client
err error
call *rpc.Call
ch = make(chan *rpc.Call, 1)
client = *dst
args = proto.NoArg{}
reply = proto.NoReply{}
bind string
network, addr string
rpcOptions []xrpc.ClientOptions
)
for {
select {
case <-quit:
for _, bind = range addrs {
if network, addr, err = inet.ParseNetwork(bind); err != nil {
log.Error("inet.ParseNetwork() error(%v)", err)
return
default:
if client != nil {
call = <-client.Go(logicServicePing, &args, &reply, ch).Done
if call.Error != nil {
log.Error("rpc ping %s error(%v)", address, call.Error)
}
}
if client == nil || call.Error != nil {
if tmp, err = rpc.Dial(network, address); err == nil {
*dst = tmp
client = tmp
}
}
}
time.Sleep(1 * time.Second)
options := xrpc.ClientOptions{
Proto: network,
Addr: addr,
}
rpcOptions = append(rpcOptions, options)
}
// rpc clients
logicRpcClient = xrpc.Dials(rpcOptions)
// ping & reconnect
logicRpcClient.Ping(logicServicePing)
log.Info("init logic rpc: %v", rpcOptions)
return
}

func connect(p *proto.Proto) (key string, rid int32, heartbeat time.Duration, err error) {
if logicRpcClient == nil {
err = ErrLogic
return
}
arg := &proto.ConnArg{Token: string(p.Body), Server: Conf.ServerId}
reply := &proto.ConnReply{}
if err = logicRpcClient.Call(logicServiceConnect, arg, reply); err != nil {
var (
arg = proto.ConnArg{Token: string(p.Body), Server: Conf.ServerId}
reply = proto.ConnReply{}
)
if err = logicRpcClient.Call(logicServiceConnect, &arg, &reply); err != nil {
log.Error("c.Call(\"%s\", \"%v\", &ret) error(%v)", logicServiceConnect, arg, err)
return
}
Expand All @@ -85,13 +60,11 @@ func connect(p *proto.Proto) (key string, rid int32, heartbeat time.Duration, er
}

func disconnect(key string, roomId int32) (has bool, err error) {
if logicRpcClient == nil {
err = ErrLogic
return
}
arg := &proto.DisconnArg{Key: key, RoomId: roomId}
reply := &proto.DisconnReply{}
if err = logicRpcClient.Call(logicServiceDisconnect, arg, reply); err != nil {
var (
arg = proto.DisconnArg{Key: key, RoomId: roomId}
reply = proto.DisconnReply{}
)
if err = logicRpcClient.Call(logicServiceDisconnect, &arg, &reply); err != nil {
log.Error("c.Call(\"%s\", \"%v\", &ret) error(%v)", logicServiceConnect, arg, err)
return
}
Expand Down
21 changes: 11 additions & 10 deletions comet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
)

var (
DefaultServer *Server
Debug bool
DefaultServer *Server
DefaultWhitelist *Whitelist
Debug bool
)

func main() {
Expand All @@ -23,14 +24,15 @@ func main() {
log.LoadConfiguration(Conf.Log)
defer log.Close()
log.Info("comet[%s] start", Ver)
perf.Init(Conf.PprofBind)
// init slow log
// TODO need to performance optimizition, so stop to use slow log
/*if err := initSlowLog(Conf.SlowLog); err != nil {
// white list log
if wl, err := NewWhitelist(Conf.WhiteLog, Conf.Whitelist); err != nil {
panic(err)
}*/
} else {
DefaultWhitelist = wl
}
perf.Init(Conf.PprofBind)
// logic rpc
if err := InitLogicRpc(Conf.LogicAddr); err != nil {
if err := InitLogicRpc(Conf.LogicAddrs); err != nil {
log.Warn("logic rpc current can't connect, retry")
}
// new server
Expand All @@ -41,8 +43,6 @@ func main() {
RoomSize: Conf.BucketRoom,
RoutineAmount: Conf.RoutineAmount,
RoutineSize: Conf.RoutineSize,
}, RoomOptions{
ChannelSize: Conf.RoomChannel,
})
}
round := NewRound(RoundOptions{
Expand All @@ -64,6 +64,7 @@ func main() {
TCPRcvbuf: Conf.TCPRcvbuf,
TCPSndbuf: Conf.TCPSndbuf,
})
// white list
// tcp comet
if err := InitTCP(Conf.TCPBind, Conf.MaxProc); err != nil {
panic(err)
Expand Down
Loading

0 comments on commit ec6ea9e

Please sign in to comment.