diff --git a/examples/mdns/Makefile b/examples/mdns/Makefile new file mode 100644 index 0000000..de500e9 --- /dev/null +++ b/examples/mdns/Makefile @@ -0,0 +1,42 @@ +PROJECT_BINARY=mdns +PROJECT_BINARY_OUTPUT=bin + +.PHONY: all + +all: help + +## Build: +tidy: ## Tidy project + @go mod tidy + +clean: ## Cleans temporary folder + @rm -rf ${PROJECT_BINARY_OUTPUT} + @rm -rf ${PROJECT_RELEASER_OUTPUT} + +build: clean tidy build-arm build-amd ## Build all + @echo "DONE" + +build-arm: ## Build for arm64 + @GO111MODULE=on CGO_ENABLED=0 go build -ldflags="-w -s" -o ${PROJECT_BINARY_OUTPUT}/arm/${PROJECT_BINARY} main.go + +build-amd: ## Build for amd64 + @GO111MODULE=on CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -ldflags="-w -s" -o ${PROJECT_BINARY_OUTPUT}/amd/${PROJECT_BINARY} main.go + +test: build ## Run unit tests + @go clean -testcache + @go test ./... + +pre-commit: test ## Checks everything is allright + @echo "Commit Status: OK" + +## Help: +help: ## Show this help. + @echo '' + @echo 'Usage:' + @echo ' make ' + @echo '' + @echo 'Targets:' + @awk 'BEGIN {FS = ":.*?## "} { \ + if (/^[a-zA-Z_-]+:.*?##.*$$/) {printf " %-20s%s\n", $$1, $$2} \ + else if (/^## .*$$/) {printf " %s\n", substr($$1,4)} \ + }' $(MAKEFILE_LIST) diff --git a/examples/mdns/README.md b/examples/mdns/README.md new file mode 100644 index 0000000..f06847c --- /dev/null +++ b/examples/mdns/README.md @@ -0,0 +1,56 @@ +# Remote Engine Discovery via mDNS +This example shows us how service discovery can be done via mDNS. + +# About +mDNS or Multicast DNS can be used to discover services on the local network without the use of an authoritative DNS server. +This enables peer-to-peer discovery. It is important to note that many networks restrict the use of multicasting, which prevents mDNS from functioning. +Notably, multicast cannot be used in any sort of cloud, or shared infrastructure environments. +However it works well in most office, home, or private infrastructure environments. + +# Quickstart + +> The **[examples](https://github.com/anthdm/hollywood/tree/master/examples/mdns)** folder is the best place to explore Hollywood's Service Discovery + +``` +make build +``` + +# Flow + 0. When start engine with remote configuration, mDNS starting to announce itself and searches for other nodes + 1. If node founds new engine, discovery actor publishes a `DiscoveryEvent` via `actor.EventStream` + 2. (For demo purposes) An `chat` actor receives the discovery event and try to send a message to validate the flow. + +# Execution + +``` +./bin/arm/mdns -port 4001 + + INFO[0000] [REMOTE] server started listenAddr="127.0.0.1:4001" + TRAC[0000] [EVENTSTREAM] subscribe id=1432518515 subs=1 + TRAC[0000] [PROCESS] started pid="127.0.0.1:4001/chat" + TRAC[0000] [PROCESS] started pid="127.0.0.1:4001/mdns" + INFO[0001] [DISCOVERY] remote discovered ID=engine_1682994946742073000 addrs="127.0.0.1:4002" + TRAC[0001] [STREAM WRITER] connected remote="127.0.0.1:4002" + TRAC[0001] [STREAM ROUTER] new stream route pid="127.0.0.1:4001/stream/127.0.0.1:4002" + INFO[0001] new message fields.msg=hello +``` +``` +./bin/arm/mdns -port 4002 + + INFO[0000] [REMOTE] server started listenAddr="127.0.0.1:4002" + TRAC[0000] [EVENTSTREAM] subscribe id=1432518515 subs=1 + TRAC[0000] [PROCESS] started pid="127.0.0.1:4002/chat" + TRAC[0000] [PROCESS] started pid="127.0.0.1:4002/mdns" + INFO[0000] [DISCOVERY] remote discovered ID=engine_1682994395132833000 addrs="127.0.0.1:4001" + TRAC[0000] [INBOX] started pid="127.0.0.1:4002/stream/127.0.0.1:4001" + TRAC[0000] [STREAM WRITER] connected remote="127.0.0.1:4001" + TRAC[0000] [STREAM ROUTER] new stream route pid="127.0.0.1:4002/stream/127.0.0.1:4001" + INFO[0000] new message fields.msg=hello +``` + +# References +- [mDNS](https://github.com/grandcat/zeroconf.git) library for golang + +# License + +Hollywood is licensed under the MIT licence. diff --git a/examples/mdns/chat/actor.go b/examples/mdns/chat/actor.go new file mode 100644 index 0000000..95706b4 --- /dev/null +++ b/examples/mdns/chat/actor.go @@ -0,0 +1,56 @@ +package chat + +import ( + "github.com/anthdm/hollywood/actor" + "github.com/anthdm/hollywood/examples/mdns/chat/types" + "github.com/anthdm/hollywood/examples/mdns/discovery" + "github.com/anthdm/hollywood/log" +) + +type server struct { + eventStream *actor.EventStream + subscription *actor.EventSub + ctx *actor.Context +} + +func New(e *actor.EventStream) actor.Producer { + return func() actor.Receiver { + ret := &server{ + eventStream: e, + } + return ret + } +} + +func (s *server) Receive(ctx *actor.Context) { + switch msg := ctx.Message().(type) { + case actor.Initialized: + s.ctx = ctx + s.subscription = s.eventStream.Subscribe(s.onMessage) + case actor.Started: + _ = msg + case actor.Stopped: + s.shutdown() + case *types.Message: + s.handleMessage(ctx, msg) + } +} +func (s *server) onMessage(event any) { + switch evt := event.(type) { + case *discovery.DiscoveryEvent: + pid := actor.NewPID(evt.Addr[0], "chat") + s.ctx.Engine().Send(pid, &types.Message{ + Username: evt.ID, + Msg: "hello", + }) + } +} + +func (s *server) shutdown() { + s.eventStream.Unsubscribe(s.subscription) +} + +// handle the incoming message by broadcasting it to all connected clients. +func (s *server) handleMessage(ctx *actor.Context, msg *types.Message) { + log.Infow("new message", log.M{"msg": msg.Msg}) +} diff --git a/examples/mdns/chat/types/types.pb.go b/examples/mdns/chat/types/types.pb.go new file mode 100644 index 0000000..4389686 --- /dev/null +++ b/examples/mdns/chat/types/types.pb.go @@ -0,0 +1,268 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.21.12 +// source: chat/types/types.proto + +package types + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Disconnect struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *Disconnect) Reset() { + *x = Disconnect{} + if protoimpl.UnsafeEnabled { + mi := &file_chat_types_types_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Disconnect) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Disconnect) ProtoMessage() {} + +func (x *Disconnect) ProtoReflect() protoreflect.Message { + mi := &file_chat_types_types_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Disconnect.ProtoReflect.Descriptor instead. +func (*Disconnect) Descriptor() ([]byte, []int) { + return file_chat_types_types_proto_rawDescGZIP(), []int{0} +} + +type Connect struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Username string `protobuf:"bytes,1,opt,name=username,proto3" json:"username,omitempty"` +} + +func (x *Connect) Reset() { + *x = Connect{} + if protoimpl.UnsafeEnabled { + mi := &file_chat_types_types_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Connect) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Connect) ProtoMessage() {} + +func (x *Connect) ProtoReflect() protoreflect.Message { + mi := &file_chat_types_types_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Connect.ProtoReflect.Descriptor instead. +func (*Connect) Descriptor() ([]byte, []int) { + return file_chat_types_types_proto_rawDescGZIP(), []int{1} +} + +func (x *Connect) GetUsername() string { + if x != nil { + return x.Username + } + return "" +} + +type Message struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Username string `protobuf:"bytes,1,opt,name=username,proto3" json:"username,omitempty"` + Msg string `protobuf:"bytes,2,opt,name=msg,proto3" json:"msg,omitempty"` +} + +func (x *Message) Reset() { + *x = Message{} + if protoimpl.UnsafeEnabled { + mi := &file_chat_types_types_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Message) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Message) ProtoMessage() {} + +func (x *Message) ProtoReflect() protoreflect.Message { + mi := &file_chat_types_types_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Message.ProtoReflect.Descriptor instead. +func (*Message) Descriptor() ([]byte, []int) { + return file_chat_types_types_proto_rawDescGZIP(), []int{2} +} + +func (x *Message) GetUsername() string { + if x != nil { + return x.Username + } + return "" +} + +func (x *Message) GetMsg() string { + if x != nil { + return x.Msg + } + return "" +} + +var File_chat_types_types_proto protoreflect.FileDescriptor + +var file_chat_types_types_proto_rawDesc = []byte{ + 0x0a, 0x16, 0x63, 0x68, 0x61, 0x74, 0x2f, 0x74, 0x79, 0x70, 0x65, 0x73, 0x2f, 0x74, 0x79, 0x70, + 0x65, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x74, 0x79, 0x70, 0x65, 0x73, 0x22, + 0x0c, 0x0a, 0x0a, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x22, 0x25, 0x0a, + 0x07, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x75, 0x73, 0x65, 0x72, + 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x75, 0x73, 0x65, 0x72, + 0x6e, 0x61, 0x6d, 0x65, 0x22, 0x37, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, + 0x1a, 0x0a, 0x08, 0x75, 0x73, 0x65, 0x72, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x08, 0x75, 0x73, 0x65, 0x72, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6d, + 0x73, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6d, 0x73, 0x67, 0x42, 0x36, 0x5a, + 0x34, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x6e, 0x74, 0x68, + 0x64, 0x6d, 0x2f, 0x68, 0x6f, 0x6c, 0x6c, 0x79, 0x77, 0x6f, 0x6f, 0x64, 0x2f, 0x65, 0x78, 0x61, + 0x6d, 0x70, 0x6c, 0x65, 0x73, 0x2f, 0x6d, 0x64, 0x6e, 0x73, 0x2f, 0x63, 0x68, 0x61, 0x74, 0x2f, + 0x74, 0x79, 0x70, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_chat_types_types_proto_rawDescOnce sync.Once + file_chat_types_types_proto_rawDescData = file_chat_types_types_proto_rawDesc +) + +func file_chat_types_types_proto_rawDescGZIP() []byte { + file_chat_types_types_proto_rawDescOnce.Do(func() { + file_chat_types_types_proto_rawDescData = protoimpl.X.CompressGZIP(file_chat_types_types_proto_rawDescData) + }) + return file_chat_types_types_proto_rawDescData +} + +var file_chat_types_types_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_chat_types_types_proto_goTypes = []interface{}{ + (*Disconnect)(nil), // 0: types.Disconnect + (*Connect)(nil), // 1: types.Connect + (*Message)(nil), // 2: types.Message +} +var file_chat_types_types_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_chat_types_types_proto_init() } +func file_chat_types_types_proto_init() { + if File_chat_types_types_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_chat_types_types_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Disconnect); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_chat_types_types_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Connect); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_chat_types_types_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Message); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_chat_types_types_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_chat_types_types_proto_goTypes, + DependencyIndexes: file_chat_types_types_proto_depIdxs, + MessageInfos: file_chat_types_types_proto_msgTypes, + }.Build() + File_chat_types_types_proto = out.File + file_chat_types_types_proto_rawDesc = nil + file_chat_types_types_proto_goTypes = nil + file_chat_types_types_proto_depIdxs = nil +} diff --git a/examples/mdns/chat/types/types.proto b/examples/mdns/chat/types/types.proto new file mode 100644 index 0000000..1a3f5b3 --- /dev/null +++ b/examples/mdns/chat/types/types.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; +package types; +option go_package = "github.com/anthdm/hollywood/examples/mdns/chat/types"; + +message Disconnect {} + +message Connect { + string username = 1; +} + +message Message { + string username = 1; + string msg = 2; +} diff --git a/examples/mdns/discovery/announcer.go b/examples/mdns/discovery/announcer.go new file mode 100644 index 0000000..f97b177 --- /dev/null +++ b/examples/mdns/discovery/announcer.go @@ -0,0 +1,39 @@ +package discovery + +import ( + "github.com/grandcat/zeroconf" +) + +const ( + serviceName = "_actor.hollywood_" + domain = "local." + host = "pc1" +) + +type announcer struct { + id string + ip []string + port int + server *zeroconf.Server +} + +func newAnnouncer(cfg *discoveryOptions) *announcer { + ret := &announcer{ + id: cfg.id, + ip: cfg.ip, + port: cfg.port, + } + return ret +} + +func (a *announcer) start() { + server, err := zeroconf.RegisterProxy(a.id, serviceName, domain, a.port, host, a.ip, []string{"txtv=0", "lo=1", "la=2"}, nil) + if err != nil { + panic(err) + } + a.server = server +} + +func (a *announcer) shutdown() { + a.server.Shutdown() +} diff --git a/examples/mdns/discovery/mdns.go b/examples/mdns/discovery/mdns.go new file mode 100644 index 0000000..650391e --- /dev/null +++ b/examples/mdns/discovery/mdns.go @@ -0,0 +1,101 @@ +package discovery + +import ( + "context" + "fmt" + "strings" + + "github.com/anthdm/hollywood/actor" + "github.com/anthdm/hollywood/log" + "github.com/grandcat/zeroconf" +) + +type mdns struct { + id string + announcer *announcer + resolver *zeroconf.Resolver + eventStream *actor.EventStream + + ctx context.Context + cancelFn context.CancelFunc +} + +func NewMdnsDiscovery(eventStream *actor.EventStream, opts ...DiscoveryOption) actor.Producer { + ctx, cancel := context.WithCancel(context.Background()) + cfg := applyDiscoveryOptions(opts...) + announcer := newAnnouncer(cfg) + return func() actor.Receiver { + ret := &mdns{ + id: cfg.id, + announcer: announcer, + ctx: ctx, + cancelFn: cancel, + eventStream: eventStream, + } + return ret + } +} + +func (d *mdns) Receive(ctx *actor.Context) { + switch msg := ctx.Message().(type) { + case actor.Initialized: + d.createResolver() + case actor.Started: + go d.startDiscovery(ctx) + d.announcer.start() + case actor.Stopped: + d.shutdown() + _ = msg + } +} + +func (d *mdns) shutdown() { + if d.announcer != nil { + d.announcer.shutdown() + } + d.cancelFn() +} + +func (d *mdns) createResolver() { + resolver, err := zeroconf.NewResolver(nil) + if err != nil { + panic(err) + } + d.resolver = resolver +} + +// Starts multicast dns discovery process. +// Searches matching entries with `serviceName` and `domain`. +func (d *mdns) startDiscovery(c *actor.Context) { + ctx, cancel := context.WithCancel(d.ctx) + defer cancel() + entries := make(chan *zeroconf.ServiceEntry) + go func(results <-chan *zeroconf.ServiceEntry) { + for entry := range results { + d.sendDiscoveryEvent(entry) + } + }(entries) + + err := d.resolver.Browse(ctx, serviceName, domain, entries) + if err != nil { + log.Infow("[DISCOVERY] starting discovery failed", log.M{"err": err.Error()}) + panic(err) + } + <-ctx.Done() +} + +// Sends discovered peer as `DiscoveryEvent` to event stream. +func (d *mdns) sendDiscoveryEvent(entry *zeroconf.ServiceEntry) { + // avoid to discover myself + if entry.Instance != d.id { + event := &DiscoveryEvent{ + ID: entry.Instance, + Addr: []string{}, + } + for _, addr := range entry.AddrIPv4 { + event.Addr = append(event.Addr, fmt.Sprintf("%s:%d", addr.String(), entry.Port)) + } + log.Infow("[DISCOVERY] remote discovered", log.M{"addrs": strings.Join(event.Addr, ","), "ID": entry.Instance}) + d.eventStream.Publish(event) + } +} diff --git a/examples/mdns/discovery/opts.go b/examples/mdns/discovery/opts.go new file mode 100644 index 0000000..22f80ea --- /dev/null +++ b/examples/mdns/discovery/opts.go @@ -0,0 +1,36 @@ +package discovery + +import ( + "fmt" + "time" +) + +type DiscoveryOption func(*discoveryOptions) + +type discoveryOptions struct { + id string // engine global ID + ip []string // engine's ip to listen on + port int // engine's port to accept conns +} + +func applyDiscoveryOptions(opts ...DiscoveryOption) *discoveryOptions { + ret := &discoveryOptions{ + id: fmt.Sprintf("engine_%d", time.Now().UnixNano()), + ip: make([]string, 0), + port: 0, + } + + for _, opt := range opts { + opt(ret) + } + + return ret +} + +// Engine's IP and Port information to announce +func WithAnnounceAddr(ip string, p int) DiscoveryOption { + return func(ao *discoveryOptions) { + ao.ip = append(ao.ip, ip) + ao.port = p + } +} diff --git a/examples/mdns/discovery/types.go b/examples/mdns/discovery/types.go new file mode 100644 index 0000000..ca3ad71 --- /dev/null +++ b/examples/mdns/discovery/types.go @@ -0,0 +1,6 @@ +package discovery + +type DiscoveryEvent struct { + ID string + Addr []string +} diff --git a/examples/mdns/go.mod b/examples/mdns/go.mod new file mode 100644 index 0000000..a4be78d --- /dev/null +++ b/examples/mdns/go.mod @@ -0,0 +1,28 @@ +module github.com/anthdm/hollywood/examples/mdns + +go 1.19 + +require ( + github.com/anthdm/hollywood v0.0.0-20230421092042-3cc6d833dc05 + github.com/grandcat/zeroconf v1.0.0 + google.golang.org/protobuf v1.30.0 +) + +require ( + github.com/cenkalti/backoff v2.2.1+incompatible // indirect + github.com/golang/protobuf v1.5.3 // indirect + github.com/klauspost/cpuid/v2 v2.0.9 // indirect + github.com/miekg/dns v1.1.54 // indirect + github.com/planetscale/vtprotobuf v0.4.0 // indirect + github.com/sirupsen/logrus v1.9.0 // indirect + github.com/zeebo/errs v1.2.2 // indirect + github.com/zeebo/xxh3 v1.0.2 // indirect + golang.org/x/mod v0.10.0 // indirect + golang.org/x/net v0.9.0 // indirect + golang.org/x/sys v0.7.0 // indirect + golang.org/x/text v0.9.0 // indirect + golang.org/x/tools v0.8.0 // indirect + google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect + google.golang.org/grpc v1.53.0 // indirect + storj.io/drpc v0.0.32 // indirect +) diff --git a/examples/mdns/go.sum b/examples/mdns/go.sum new file mode 100644 index 0000000..d7a94fc --- /dev/null +++ b/examples/mdns/go.sum @@ -0,0 +1,77 @@ +github.com/anthdm/hollywood v0.0.0-20230421092042-3cc6d833dc05 h1:s4w5cYW0syv6EeaOq+m8OSpqO4pCfhj/Tc+m0OeZBaU= +github.com/anthdm/hollywood v0.0.0-20230421092042-3cc6d833dc05/go.mod h1:OOQl6/HISo57/knniNj8wfd2wNr5kJ2NcsNdPEisT8k= +github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= +github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= +github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/grandcat/zeroconf v1.0.0 h1:uHhahLBKqwWBV6WZUDAT71044vwOTL+McW0mBJvo6kE= +github.com/grandcat/zeroconf v1.0.0/go.mod h1:lTKmG1zh86XyCoUeIHSA4FJMBwCJiQmGfcP2PdzytEs= +github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/miekg/dns v1.1.27/go.mod h1:KNUDUusw/aVsxyTYZM1oqvCicbwhgbNgztCETuNZ7xM= +github.com/miekg/dns v1.1.54 h1:5jon9mWcb0sFJGpnI99tOMhCPyJ+RPVz5b63MQG0VWI= +github.com/miekg/dns v1.1.54/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/planetscale/vtprotobuf v0.4.0 h1:NEI+g4woRaAZgeZ3sAvbtyvMBRjIv5kE7EWYQ8m4JwY= +github.com/planetscale/vtprotobuf v0.4.0/go.mod h1:wm1N3qk9G/4+VM1WhpkLbvY/d8+0PbwYYpP5P5VhTks= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= +github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/errs v1.2.2 h1:5NFypMTuSdoySVTqlNs1dEoU21QVamMQJxW/Fii5O7g= +github.com/zeebo/errs v1.2.2/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk= +golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= +golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/tools v0.0.0-20191216052735-49a3e744a425/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.8.0 h1:vSDcovVPld282ceKgDimkRSC8kpaH1dgyc9UMzlt84Y= +golang.org/x/tools v0.8.0/go.mod h1:JxBZ99ISMI5ViVkT1tr6tdNmXeTrcpVSD3vZ1RsRdN4= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f h1:BWUVssLB0HVOSY78gIdvk1dTVYtT1y8SBWtPYuTJ/6w= +google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM= +google.golang.org/grpc v1.53.0 h1:LAv2ds7cmFV/XTS3XG1NneeENYrXGmorPxsBbptIjNc= +google.golang.org/grpc v1.53.0/go.mod h1:OnIrk0ipVdj4N5d9IUoFUx72/VlD7+jUsHwZgwSMQpw= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= +google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +storj.io/drpc v0.0.32 h1:5p5ZwsK/VOgapaCu+oxaPVwO6UwIs+iwdMiD50+R4PI= +storj.io/drpc v0.0.32/go.mod h1:6rcOyR/QQkSTX/9L5ZGtlZaE2PtXTTZl8d+ulSeeYEg= diff --git a/examples/mdns/main.go b/examples/mdns/main.go new file mode 100644 index 0000000..82dfe9b --- /dev/null +++ b/examples/mdns/main.go @@ -0,0 +1,42 @@ +package main + +import ( + "flag" + "fmt" + "os" + "os/signal" + "syscall" + + "github.com/anthdm/hollywood/actor" + "github.com/anthdm/hollywood/examples/mdns/chat" + "github.com/anthdm/hollywood/examples/mdns/discovery" + "github.com/anthdm/hollywood/remote" +) + +var ( + port = flag.Int("port", 4001, "Set the port the service is listening to.") + ip = flag.String("ip", "127.0.0.1", "Set IP a service should be reachable.") +) + +func main() { + flag.Parse() + + engine := actor.NewEngine() + + r := remote.New(engine, remote.Config{ + ListenAddr: fmt.Sprintf("%s:%d", *ip, *port), + }) + engine.WithRemote(r) + engine.Spawn(chat.New(engine.EventStream), "chat") + + // starts mdns discovery + engine.Spawn(discovery.NewMdnsDiscovery( + engine.EventStream, + discovery.WithAnnounceAddr(*ip, *port), + ), "mdns") + + // Clean exit. + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt, syscall.SIGTERM) + <-sig +}