Skip to content

Commit

Permalink
Merge pull request #60 from DrmagicE/federation
Browse files Browse the repository at this point in the history
feat: add �federation plugin.
  • Loading branch information
DrmagicE authored Feb 16, 2021
2 parents afcf8c8 + 43210cb commit dd64ea7
Show file tree
Hide file tree
Showing 55 changed files with 7,162 additions and 285 deletions.
9 changes: 6 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,16 @@ generate-swagger:

# generate mock code
generate-mocks:
@./mock_gen.sh

go-generate:
go generate ./...

run:
run: go-generate
go run ./cmd/gmqttd start -c ./cmd/gmqttd/default_config.yml

# generate all grpc files and mocks and build the go code
build:
build: go-generate
go build -o $(BUILD_DIR)/gmqttd ./cmd/gmqttd

# generate mocks and run short tests
Expand All @@ -127,7 +130,7 @@ test-cover:
test-all: test test-bench test-cover

# Build Golang application binary with settings to enable it to run in a Docker scratch container.
binary: generate-grpc
binary: go-generate
CGO_ENABLED=0 GOOS=linux go build -ldflags '-s' -o $(BUILD_DIR)/gmqttd ./cmd/gmqttd

build-docker:
Expand Down
18 changes: 4 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
[中文文档](https://github.com/DrmagicE/gmqtt/blob/master/README_ZH.md)
# Gmqtt [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go) [![Build Status](https://travis-ci.org/DrmagicE/gmqtt.svg?branch=master)](https://travis-ci.org/DrmagicE/gmqtt) [![codecov](https://codecov.io/gh/DrmagicE/gmqtt/branch/master/graph/badge.svg)](https://codecov.io/gh/DrmagicE/gmqtt) [![Go Report Card](https://goreportcard.com/badge/github.com/DrmagicE/gmqtt)](https://goreportcard.com/report/github.com/DrmagicE/gmqtt)

News: MQTT V5 is now supported. But due to those new features in v5, there area lots of breaking changes.
If you have any migration problems, feel free to raise an issue.
Or you can use the latest v3 [broker](https://github.com/DrmagicE/gmqtt/tree/v0.1.4).
News: Cluster mode is now supported, see [federation plugin](./plugin/federation/README.md) for examples and details.

# Installation
```$ go get -u github.com/DrmagicE/gmqtt```
Expand All @@ -18,11 +16,7 @@ See `Server` interface in `server/server.go` and [admin](https://github.com/Drma
* Provide GRPC and REST APIs to interact with server. (plugin:[admin](https://github.com/DrmagicE/gmqtt/blob/master/plugin/admin/README.md))
* Provide session persistence which means the broker can retrieve the session data after restart.
Currently, only redis backend is supported.



# Limitations
* Cluster is not supported.
* Provide clustering, see [federation plugin](./plugin/federation/README.md) for examples and details.


# Get Started
Expand Down Expand Up @@ -113,7 +107,8 @@ Gmqtt implements the following hooks:
| OnDelivered | When a message is delivered to the client | |
| OnClosed | When the client is closed | |
| OnMsgDropped | When a message is dropped for some reasons| |

| OnWillPublish | When the client is going to deliver a will message | Modify or drop the will message |
| OnWillPublished| When a will message has been delivered| |

See `/examples/hook` for details.

Expand All @@ -132,8 +127,3 @@ $ go test -race ./...
## Integration Test
[paho.mqtt.testing](https://github.com/eclipse/paho.mqtt.testing).


# TODO
* Support bridge mode and cluster.

*Breaking changes may occur when adding this new features.*
13 changes: 4 additions & 9 deletions README_ZH.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Gmqtt [![Mentioned in Awesome Go](https://awesome.re/mentioned-badge.svg)](https://github.com/avelino/awesome-go) [![Build Status](https://travis-ci.org/DrmagicE/Gmqtt.svg?branch=master)](https://travis-ci.org/DrmagicE/Gmqtt) [![codecov](https://codecov.io/gh/DrmagicE/Gmqtt/branch/master/graph/badge.svg)](https://codecov.io/gh/DrmagicE/Gmqtt) [![Go Report Card](https://goreportcard.com/badge/github.com/DrmagicE/Gmqtt)](https://goreportcard.com/report/github.com/DrmagicE/Gmqtt)

News: 现已支持V5版本,但由于V5的功能特性,Gmqtt做了很多不兼容的改动,对此有疑问欢迎提issue交流,或者依然使用最新的[V3版本](https://github.com/DrmagicE/gmqtt/tree/v0.1.4).
News: 集群模式已支持,示例和详情请参考[federation plugin](./plugin/federation/README.md)

Gmqtt是用Go语言实现的一个具备灵活灵活扩展能力,高性能的MQTT broker,其完整实现了MQTT V3.1.1和V5协议。

Expand All @@ -15,9 +15,7 @@ Gmqtt是用Go语言实现的一个具备灵活灵活扩展能力,高性能的M
* 提供监控指标,支持prometheus。 (plugin: [prometheus](https://github.com/DrmagicE/Gmqtt/blob/master/plugin/prometheus/READEME.md))
* GRPC和REST API 支持. (plugin:[admin](https://github.com/DrmagicE/Gmqtt/blob/master/plugin/admin/READEME.md))
* 支持session持久化,broker重启消息不丢失,目前支持redis持久化。

# 缺陷
* 不支持集群。
* 支持集群, 示例和详情请参考[federation plugin](./plugin/federation/README.md)

# 开始

Expand Down Expand Up @@ -103,6 +101,8 @@ Gmqtt实现了下列钩子方法。
| OnDelivered | 消息从broker投递到客户端后调用 | |
| OnClosed | 客户端断开连接后调用 | 统计在线客户端数量 |
| OnMsgDropped | 消息被丢弃时调用 | |
| OnWillPublish | 发布遗嘱消息前 | 修改或丢弃遗嘱消息|
| OnWillPublished| 发布遗嘱消息后| |

`/examples/hook` 中有常用钩子的使用方法介绍。

Expand All @@ -118,8 +118,3 @@ $ go test -race ./...
## 集成测试
[paho.mqtt.testing](https://github.com/eclipse/paho.mqtt.testing).


# TODO
* 桥接模式,集群模式

*暂时不保证向后兼容,在添加上述新功能时可能会有breaking changes。*
15 changes: 9 additions & 6 deletions cmd/gmqttd/command/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,24 +107,27 @@ func NewStartCmd() *cobra.Command {
} else {
must(err)
}
err = c.Validate()
must(err)
pid, err := pidfile.New(c.PidFile)
if err != nil {
must(fmt.Errorf("open pid file failed: %s", err))
if c.PidFile != "" {
pid, err := pidfile.New(c.PidFile)
if err != nil {
must(fmt.Errorf("open pid file failed: %s", err))
}
defer pid.Remove()
}
defer pid.Remove()

tcpListeners, websockets, err := GetListeners(c)
must(err)
l, err := c.GetLogger(c.Log)
must(err)
logger = l

s := server.New(
server.WithConfig(c),
server.WithTCPListener(tcpListeners...),
server.WithWebsocketServer(websockets...),
server.WithLogger(l),
)

err = s.Init()
if err != nil {
fmt.Println(err)
Expand Down
44 changes: 39 additions & 5 deletions cmd/gmqttd/default_config.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Path to pid file, default to /var/run/gmqttd.pid
# pid_file:
# Path to pid file.
# If not set, there will be no pid file.
# pid_file: /var/run/gmqttd.pid

listeners:
# bind address
Expand All @@ -16,15 +17,17 @@ listeners:

api:
grpc:
- address: "unix:///var/run/gmqttd.sock" # The gRPC server listen address.
# The gRPC server listen address. Supports unix socket and tcp socket.
- address: "tcp://127.0.0.1:8084"
#- address: "unix:///var/run/gmqttd.sock"
# tls:
# cacert: "path_to_ca_cert_file"
# cert: "path_to_cert_file"
# key: "path_to_key_file"
http:
# The HTTP server listen address. This is a reverse-proxy server in front of gRPC server.
- address: "tcp://127.0.0.1:8083"
map: "unix:///var/run/gmqttd.sock" # The backend gRPC server endpoint,
map: "tcp://127.0.0.1:8084" # The backend gRPC server endpoint,
# tls:
# cacert: "path_to_ca_cert_file"
# cert: "path_to_cert_file"
Expand Down Expand Up @@ -82,15 +85,46 @@ plugins:
hash: md5
# The file to store password. If it is a relative path, it locates in the same directory as the config file.
# (e.g: ./gmqtt_password => /etc/gmqtt/gmqtt_password.yml)
# Default to ./gmqtt_password.yml
# Defaults to ./gmqtt_password.yml
# password_file:
federation:
# node_name is the unique identifier for the node in the federation. Defaults to hostname.
# node_name:
# fed_addr is the gRPC server listening address for the federation internal communication. Defaults to :8901
fed_addr: :8901
# advertise_fed_addr is used to change the federation gRPC server address that we advertise to other nodes in the cluster.
# Defaults to "fed_addr".However, in some cases, there may be a routable address that cannot be bound.
# If the port is missing, the default federation port (8901) will be used.
advertise_fed_addr: :8901
# gossip_addr is the address that the gossip will listen on, It is used for both UDP and TCP gossip. Defaults to :8902
gossip_addr: :8902
# advertise_gossip_addr is used to change the gossip server address that we advertise to other nodes in the cluster.
# Defaults to "GossipAddr" or the private IP address of the node if the IP in "GossipAddr" is 0.0.0.0.
# If the port is missing, the default gossip port (8902) will be used.
advertise_gossip_addr: :8902

# retry_join is the address of other nodes to join upon starting up.
# If port is missing, the default gossip port (8902) will be used.
#retry_join:
# - 127.0.0.1:8902

# rejoin_after_leave will be pass to "RejoinAfterLeave" in serf configuration.
# It controls our interaction with the snapshot file.
# When set to false (default), a leave causes a Serf to not rejoin the cluster until an explicit join is received.
# If this is set to true, we ignore the leave, and rejoin the cluster on start.
rejoin_after_leave: false
# snapshot_path will be pass to "SnapshotPath" in serf configuration.
# When Serf is started with a snapshot,it will attempt to join all the previously known nodes until one
# succeeds and will also avoid replaying old user events.
snapshot_path:

# plugin loading orders
plugin_order:
# Uncomment auth to enable authentication.
# - auth
- prometheus
- admin
- federation
log:
level: info # debug | info | warn | error
format: text # json | text
Expand Down
15 changes: 8 additions & 7 deletions cmd/gmqttd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package main
import (
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"path"
//"runtime/pprof"
_ "runtime/pprof"

"github.com/spf13/cobra"

Expand Down Expand Up @@ -40,12 +41,12 @@ func init() {
}

func main() {
// f, err := os.Create("cpu.profile")
// if err != nil {
// panic(err)
// }
// pprof.StartCPUProfile(f)
// defer pprof.StopCPUProfile()
//f, err := os.Create("cpu.profile")
//if err != nil {
// panic(err)
//}
//pprof.StartCPUProfile(f)
//defer pprof.StopCPUProfile()
go func() {
http.ListenAndServe(":6060", nil)
}()
Expand Down
4 changes: 4 additions & 0 deletions cmd/gmqttd/plugins.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
//go:generate sh -c "cd ../../ && go run plugin_generate.go"
// generated by plugin_generate.go; DO NOT EDIT

package main

import (
_ "github.com/DrmagicE/gmqtt/plugin/admin"
_ "github.com/DrmagicE/gmqtt/plugin/auth"
_ "github.com/DrmagicE/gmqtt/plugin/federation"
_ "github.com/DrmagicE/gmqtt/plugin/prometheus"
)
14 changes: 5 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package config

import (
"errors"
"fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -38,10 +37,6 @@ func RegisterDefaultPluginConfig(name string, config Configuration) {
// DefaultConfig return the default configuration.
// If config file is not provided, gmqttd will start with DefaultConfig.
func DefaultConfig() Config {
pidFile, err := getDefaultPidFile()
if err != nil {
panic(err)
}
c := Config{
Listeners: DefaultListeners,
MQTT: DefaultMQTTConfig,
Expand All @@ -50,7 +45,6 @@ func DefaultConfig() Config {
Level: "info",
Format: "text",
},
PidFile: pidFile,
Plugins: make(pluginConfig),
Persistence: DefaultPersistenceConfig,
TopicAliasManager: DefaultTopicAliasManager,
Expand Down Expand Up @@ -115,6 +109,7 @@ type Config struct {
Listeners []*ListenerConfig `yaml:"listeners"`
API API `yaml:"api"`
MQTT MQTT `yaml:"mqtt,omitempty"`
GRPC GRPC `yaml:"gRPC"`
Log LogConfig `yaml:"log"`
PidFile string `yaml:"pid_file"`
ConfigDir string `yaml:"config_dir"`
Expand All @@ -127,6 +122,10 @@ type Config struct {
TopicAliasManager TopicAliasManager `yaml:"topic_alias_manager"`
}

type GRPC struct {
Endpoint string `yaml:"endpoint"`
}

type TLSOptions struct {
// CACert is the trust CA certificate file.
CACert string `yaml:"cacert"`
Expand Down Expand Up @@ -175,9 +174,6 @@ func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error {
}

func (c Config) Validate() (err error) {
if c.PidFile == "" {
return errors.New("empty pid_file")
}
err = c.Log.Validate()
if err != nil {
return err
Expand Down
7 changes: 0 additions & 7 deletions config/config_unix.go

This file was deleted.

12 changes: 0 additions & 12 deletions config/config_windows.go

This file was deleted.

8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,25 @@ module github.com/DrmagicE/gmqtt
go 1.14

require (
github.com/golang/mock v1.2.0
github.com/golang/mock v1.4.4
github.com/golang/protobuf v1.4.2
github.com/gomodule/redigo v1.8.2
github.com/google/uuid v1.1.2
github.com/gorilla/websocket v1.4.2
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/hashicorp/go-sockaddr v1.0.0
github.com/hashicorp/logutils v1.0.0
github.com/hashicorp/serf v0.9.5
github.com/iancoleman/strcase v0.1.2
github.com/mitchellh/go-homedir v1.1.0
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.4.0
github.com/spf13/cobra v1.0.0
github.com/stretchr/testify v1.6.1
go.uber.org/zap v1.13.0
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd
golang.org/x/text v0.3.2 // indirect
golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc // indirect
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013
google.golang.org/grpc v1.34.0
Expand Down
Loading

0 comments on commit dd64ea7

Please sign in to comment.