Skip to content

Commit

Permalink
增加通知上线接口
Browse files Browse the repository at this point in the history
  • Loading branch information
sh7ning committed Dec 3, 2018
1 parent 7e04403 commit ab9e7a5
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 7 deletions.
5 changes: 3 additions & 2 deletions config.ini → .env
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ SOCKET_PORT=8900
# SOCKET_CERT_FILE=
# SOCKET_KEY_FILE=

GATEWAY_API_ADDRESS=127.0.0.1:8901
#GATEWAY_API_ADDRESS=192.168.3.92:8901
GATEWAY_API_ADDRESS=192.168.3.92:8901
GATEWAY_API_TOKEN=token

NOTIFICATION_URL=http://message.demo.com/im/index/rpc
NOTIFICATION_URL=http://sdk.demo.com
NOTIFICATION_USER_AGENT="Gopusher 1.0"
7 changes: 3 additions & 4 deletions comet/comet.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package comet

import (
"github.com/gopusher/gateway/notification"
"github.com/gopusher/gateway/configuration"
"github.com/gopusher/gateway/contracts"
"github.com/gopusher/gateway/connection/websocket"
Expand All @@ -15,17 +14,17 @@ func Run() {

go server.Run()

go server.JoinCluster()

api.InitRpcServer(server, config)
}

func getCometServer(config *configuration.CometConfig) contracts.Server {
rpc := notification.NewRpc(config.NotificationUrl, config.NotificationUserAgent)

switch config.SocketProtocol {
case "ws":
fallthrough
case "wss":
return websocket.NewWebSocketServer(config, rpc)
return websocket.NewWebSocketServer(config)
case "tcp": //暂时不处理
panic("Unsupported protocol: " + config.SocketProtocol)
default:
Expand Down
15 changes: 14 additions & 1 deletion connection/websocket/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"github.com/gopusher/gateway/notification"
"github.com/gopusher/gateway/log"
"time"
)

type Server struct {
Expand All @@ -18,7 +19,7 @@ type Server struct {
clients map[string]*Client
}

func NewWebSocketServer(config *configuration.CometConfig, rpc *notification.Client) *Server {
func NewWebSocketServer(config *configuration.CometConfig) *Server {
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
Expand All @@ -27,6 +28,7 @@ func NewWebSocketServer(config *configuration.CometConfig, rpc *notification.Cli
},
}

rpc := notification.NewRpc(config.NotificationUrl, config.NotificationUserAgent)
return &Server{
config: config,
rpc: rpc,
Expand Down Expand Up @@ -210,3 +212,14 @@ func (s *Server) GetAllConnections() []string {

return connectionIds
}

func (s *Server) JoinCluster() {
//wait for rpc and ws server bootstrap
time.Sleep(time.Duration(5)*time.Second)
//notify router api server
if _, err := s.rpc.Call("JoinCluster", s.config.NodeId); err != nil {
log.Error("Gateway JoinCluster notification failed: %s", err.Error())
}

log.Info("Gateway JoinCluster notification success")
}
1 change: 1 addition & 0 deletions contracts/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ type Server interface {
KickAllConnections()
CheckConnectionsOnline(connections []string) []string
GetAllConnections() []string
JoinCluster()
}

0 comments on commit ab9e7a5

Please sign in to comment.