Skip to content

Commit

Permalink
Merge pull request #38 from DrmagicE/auth
Browse files Browse the repository at this point in the history
feat: add GRPCGatewayRegister interface
  • Loading branch information
DrmagicE authored Dec 19, 2020
2 parents 4bc55a5 + 67c5578 commit 215d372
Show file tree
Hide file tree
Showing 21 changed files with 293 additions and 184 deletions.
3 changes: 3 additions & 0 deletions cmd/gmqttd/command/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,14 @@ func NewStartCmd() *cobra.Command {
)
err = s.Init()
if err != nil {
fmt.Println(err)
os.Exit(1)
return
}
err = s.Run()
if err != nil {
fmt.Println(err)
os.Exit(1)
return
}
installSignal(s)
Expand Down
88 changes: 56 additions & 32 deletions plugin/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/DrmagicE/gmqtt/config"
"github.com/DrmagicE/gmqtt/server"
Expand All @@ -35,6 +33,24 @@ func New(config config.Config) (server.Plugin, error) {

var log *zap.Logger

// GRPCGatewayRegister provides the ability to share the gRPC and HTTP server to other plugins.
type GRPCGatewayRegister interface {
GRPCRegister
HTTPRegister
}

// GRPCRegister is the interface that enable the implement to expose gRPC endpoint.
type GRPCRegister interface {
// RegisterGRPC registers the gRPC handler into gRPC server which created by admin plugin.
RegisterGRPC(s grpc.ServiceRegistrar)
}

// HTTPRegister is the interface that enable the implement to expose HTTP endpoint.
type HTTPRegister interface {
// RegisterHTTP registers the http handler into http server which created by admin plugin.
RegisterHTTP(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error)
}

// Admin providers gRPC and HTTP API that enables the external system to interact with the broker.
type Admin struct {
config Config
Expand All @@ -46,26 +62,32 @@ type Admin struct {
store *store
}

func (a *Admin) registerHTTP() (err error) {
mux := runtime.NewServeMux(runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{OrigName: true, EmitDefaults: true}))

func (a *Admin) registerHTTP(mux *runtime.ServeMux) (err error) {
err = RegisterClientServiceHandlerFromEndpoint(
context.Background(),
mux,
a.config.GRPC.Addr,
[]grpc.DialOption{grpc.WithInsecure()})
[]grpc.DialOption{grpc.WithInsecure()},
)
if err != nil {
return err
}

err = RegisterSubscriptionServiceHandlerFromEndpoint(
context.Background(),
mux,
a.config.GRPC.Addr,
[]grpc.DialOption{grpc.WithInsecure()})

[]grpc.DialOption{grpc.WithInsecure()},
)
if err != nil {
return err
}
err = RegisterPublishServiceHandlerFromEndpoint(
context.Background(),
mux,
a.config.GRPC.Addr,
[]grpc.DialOption{grpc.WithInsecure()})
[]grpc.DialOption{grpc.WithInsecure()},
)

if err != nil {
return err
Expand All @@ -92,9 +114,34 @@ func (a *Admin) Load(service server.Server) error {
grpc_prometheus.UnaryServerInterceptor),
)
a.grpcServer = s

RegisterClientServiceServer(s, &clientService{a: a})
RegisterSubscriptionServiceServer(s, &subscriptionService{a: a})
RegisterPublishServiceServer(s, &publisher{a: a})
mux := runtime.NewServeMux(runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{OrigName: true, EmitDefaults: true}))
if a.config.HTTP.Enable {
err := a.registerHTTP(mux)
if err != nil {
return err
}
}

for _, v := range service.Plugins() {
if v, ok := v.(GRPCRegister); ok {
v.RegisterGRPC(s)
}

if v, ok := v.(HTTPRegister); a.config.HTTP.Enable && ok {
err := v.RegisterHTTP(context.Background(),
mux,
a.config.GRPC.Addr,
[]grpc.DialOption{grpc.WithInsecure()})
if err != nil {
return err
}
}

}
l, err := net.Listen("tcp", a.config.GRPC.Addr)
if err != nil {
return err
Expand All @@ -111,9 +158,6 @@ func (a *Admin) Load(service server.Server) error {
panic(err)
}
}()
if a.config.HTTP.Enable {
err = a.registerHTTP()
}
return err
}

Expand All @@ -128,23 +172,3 @@ func (a *Admin) Unload() error {
func (a *Admin) Name() string {
return Name
}

func getPage(reqPage, reqPageSize uint32) (page, pageSize uint) {
page = 1
pageSize = 20
if reqPage != 0 {
page = uint(reqPage)
}
if reqPageSize != 0 {
pageSize = uint(reqPageSize)
}
return
}

func InvalidArgument(name string, msg string) error {
errString := "invalid " + name
if msg != "" {
errString = errString + ":" + msg
}
return status.Error(codes.InvalidArgument, errString)
}
9 changes: 6 additions & 3 deletions plugin/admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (c *clientService) mustEmbedUnimplementedClientServiceServer() {

// List lists clients information which the session is valid in the broker (both connected and disconnected).
func (c *clientService) List(ctx context.Context, req *ListClientRequest) (*ListClientResponse, error) {
page, pageSize := getPage(req.Page, req.PageSize)
page, pageSize := GetPage(req.Page, req.PageSize)
clients, total, err := c.a.store.GetClients(page, pageSize)
if err != nil {
return &ListClientResponse{}, err
Expand All @@ -30,9 +30,12 @@ func (c *clientService) List(ctx context.Context, req *ListClientRequest) (*List
// Get returns the client information for given request client id.
func (c *clientService) Get(ctx context.Context, req *GetClientRequest) (*GetClientResponse, error) {
if req.ClientId == "" {
return nil, InvalidArgument("client_id", "")
return nil, ErrInvalidArgument("client_id", "")
}
client := c.a.store.GetClientByID(req.ClientId)
if client == nil {
return nil, ErrNotFound
}
return &GetClientResponse{
Client: client,
}, nil
Expand All @@ -41,7 +44,7 @@ func (c *clientService) Get(ctx context.Context, req *GetClientRequest) (*GetCli
// Delete force disconnect.
func (c *clientService) Delete(ctx context.Context, req *DeleteClientRequest) (*empty.Empty, error) {
if req.ClientId == "" {
return nil, InvalidArgument("client_id", "")
return nil, ErrInvalidArgument("client_id", "")
}
if req.CleanSession {
c.a.clientService.TerminateSession(req.ClientId)
Expand Down
5 changes: 2 additions & 3 deletions plugin/admin/client.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions plugin/admin/client_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions plugin/admin/protos/client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,20 @@ message Client {


service ClientService {
// lists clients
// List clients
rpc List (ListClientRequest) returns (ListClientResponse){
option (google.api.http) = {
get: "/v1/clients"
};
}
// Get the client for given client id.
// Return NotFound error when client not found.
rpc Get (GetClientRequest) returns (GetClientResponse){
option (google.api.http) = {
get: "/v1/clients/{client_id}"
};
}
// disconnect the client for given client id.
// Disconnect the client for given client id.
rpc Delete (DeleteClientRequest) returns (google.protobuf.Empty) {
option (google.api.http) = {
delete: "/v1/clients/{client_id}"
Expand Down
2 changes: 1 addition & 1 deletion plugin/admin/protos/publish.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ message UserProperties {
}

service PublishService {
// publish message to broker
// Publish message to broker
rpc Publish (PublishRequest) returns (google.protobuf.Empty){
option (google.api.http) = {
post: "/v1/publish"
Expand Down
8 changes: 4 additions & 4 deletions plugin/admin/protos/subscription.proto
Original file line number Diff line number Diff line change
Expand Up @@ -75,26 +75,26 @@ message Subscription {
string client_id = 7;
}
service SubscriptionService {
// list subscriptions.
// List subscriptions.
rpc List (ListSubscriptionRequest) returns (ListSubscriptionResponse){
option (google.api.http) = {
get: "/v1/subscriptions"
};
}
// filter subscriptions, paging is not supported in this api.
// Filter subscriptions, paging is not supported in this API.
rpc Filter(FilterSubscriptionRequest) returns (FilterSubscriptionResponse) {
option (google.api.http) = {
get: "/v1/filter_subscriptions"
};
}
// subscribe topics for the client.
// Subscribe topics for the client.
rpc Subscribe (SubscribeRequest) returns (SubscribeResponse) {
option (google.api.http) = {
post: "/v1/subscribe"
body:"*"
};
}
// unsubscribe topics for the client.
// Unsubscribe topics for the client.
rpc Unsubscribe (UnsubscribeRequest) returns (google.protobuf.Empty) {
option (google.api.http) = {
post: "/v1/unsubscribe"
Expand Down
8 changes: 4 additions & 4 deletions plugin/admin/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ func (p *publisher) mustEmbedUnimplementedPublishServiceServer() {
// Publish publishes a message into broker.
func (p *publisher) Publish(ctx context.Context, req *PublishRequest) (resp *empty.Empty, err error) {
if !packets.ValidV5Topic([]byte(req.TopicName)) {
return nil, InvalidArgument("topic_name", "")
return nil, ErrInvalidArgument("topic_name", "")
}
if req.Qos > uint32(packets.Qos2) {
return nil, InvalidArgument("qos", "")
return nil, ErrInvalidArgument("qos", "")
}
if req.PayloadFormat != 0 && req.PayloadFormat != 1 {
return nil, InvalidArgument("payload_format", "")
return nil, ErrInvalidArgument("payload_format", "")
}
if req.ResponseTopic != "" && !packets.ValidV5Topic([]byte(req.ResponseTopic)) {
return nil, InvalidArgument("response_topic", "")
return nil, ErrInvalidArgument("response_topic", "")
}
var userPpt []packets.UserProperty
for _, v := range req.UserProperties {
Expand Down
5 changes: 2 additions & 3 deletions plugin/admin/publish.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions plugin/admin/publish_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 215d372

Please sign in to comment.