From 4fe26438f0a4213a0551bec8ec224750bdae3e5c Mon Sep 17 00:00:00 2001 From: Per Buer Date: Mon, 11 Dec 2023 16:03:47 +0100 Subject: [PATCH] New benchmark suite (#97) * added max size for batching messages * default message batch size * wip: chaos. * Add benchmark actor and protobuf message The update introduces a benchmark actor that receives messages and increments a message counter. It also includes the creation of a new protobuf message. The main function is updated to incorporate this benchmark actor, while providing a detailed simulation of sending messages across multiple actors distributed across different engines. * Refactor benchmark code and add profiling The benchmark code is refactored for better error handling and message checks. 'benchmark' function is extracted from 'main' for further testing. A new Makefile target 'bench-profile' is added for profiling. A new test file 'main_test.go' is created for benchmark testing. Corresponding instructions are added in the newly created README.md file. Also, .gitignore is updated to exclude the created test, cpu, and memory profiles. * document how to use the interactive web interface. * add latency measurement as well. * bah. can't make go test ignore the new latency tests. I just commented it out. Gonna revisit latency benchmarks later. * Update benchmark command in Makefile The benchmark command in the Makefile has been updated to run the whole package, not just main.go --------- Co-authored-by: anthdm --- .gitignore | 5 +- Makefile | 5 +- _bench/README.md | 43 +++++++ _bench/main.go | 236 +++++++++++++++++++++++++++++++------- _bench/main_test.go | 36 ++++++ _bench/message.pb.go | 266 +++++++++++++++++++++++++++++++++++++++++++ _bench/message.proto | 15 +++ 7 files changed, 561 insertions(+), 45 deletions(-) create mode 100644 _bench/README.md create mode 100644 _bench/main_test.go create mode 100644 _bench/message.pb.go create mode 100644 _bench/message.proto diff --git a/.gitignore b/.gitignore index 1e5d8b0..e0dc542 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,7 @@ TODO _test /.vscode /.idea -.DS_Store \ No newline at end of file +.DS_Store +/_bench.test +/cpu.prof +/mem.prof diff --git a/Makefile b/Makefile index a305483..5975291 100644 --- a/Makefile +++ b/Makefile @@ -17,6 +17,9 @@ build: go build -o bin/chatclient examples/chat/client/main.go bench: - go run _bench/main.go + go run ./_bench/. + +bench-profile: + go test -bench='^BenchmarkHollywood$$' -run=NONE -cpuprofile cpu.prof -memprofile mem.prof ./_bench .PHONY: proto diff --git a/_bench/README.md b/_bench/README.md new file mode 100644 index 0000000..484856c --- /dev/null +++ b/_bench/README.md @@ -0,0 +1,43 @@ +# Benchmark suite for Hollywood + +This is a benchmark suite for the Hollywood framework. It spins up a number of engines, a whole lot of actors +and then sends messages between them. + +## Running the benchmark +``` +make bench +``` + +## Profiling the benchmark + +We can use the `pprof` tool to profile the benchmark. First, we need to run the benchmark with profiling enabled: +``` +make bench-profile +``` + +This will run the benchmark and generate a CPU and a memory profile. We can then use the `pprof` tool to analyze the +profiles. + + +## Analyzing the profiles + +### For CPU profile, basic view +``` +go tool pprof cpu.prof +> web +``` + +### For Memory profile, basic view +``` +go tool pprof mem.prof +> web +``` + +### Fancy web interface +``` +go tool pprof -http=:8080 cpu.prof +``` +and +``` +go tool pprof -http=:8080 mem.prof +``` \ No newline at end of file diff --git a/_bench/main.go b/_bench/main.go index f218224..9167e87 100644 --- a/_bench/main.go +++ b/_bench/main.go @@ -1,74 +1,224 @@ package main import ( + "errors" "fmt" - "log" + "github.com/anthdm/hollywood/actor" + "github.com/anthdm/hollywood/remote" "log/slog" + "math/rand" "os" "runtime" + "sync" + "sync/atomic" "time" +) - "github.com/anthdm/hollywood/actor" - "github.com/anthdm/hollywood/remote" +//go:generate protoc --proto_path=. --go_out=. --go_opt=paths=source_relative message.proto + +type monitor struct { +} + +func (m *monitor) Receive(ctx *actor.Context) { + switch ctx.Message().(type) { + case actor.Initialized: + ctx.Engine().BroadcastEvent(&actor.EventSub{}) + case actor.DeadLetterEvent: + deadLetters.Add(1) + } +} +func newMonitor() actor.Receiver { + return &monitor{} +} + +type benchMarkActor struct { + internalMessageCount int64 +} + +var ( + receiveCount *atomic.Int64 + sendCount *atomic.Int64 + deadLetters *atomic.Int64 ) -func makeRemoteEngine(addr string) *actor.Engine { - r := remote.New(remote.Config{ListenAddr: addr}) - e, err := actor.NewEngine(actor.EngineOptRemote(r)) - if err != nil { - log.Fatal(err) +func init() { + receiveCount = &atomic.Int64{} + sendCount = &atomic.Int64{} + deadLetters = &atomic.Int64{} +} + +func (b *benchMarkActor) Receive(ctx *actor.Context) { + switch ctx.Message().(type) { + case *Message: + b.internalMessageCount++ + receiveCount.Add(1) + case *Ping: + ctx.Respond(&Pong{}) } - return e } -func benchmarkRemote() { - var ( - a = makeRemoteEngine("127.0.0.1:3000") - b = makeRemoteEngine("127.0.0.1:3001") - pidB = b.SpawnFunc(func(c *actor.Context) {}, "bench", actor.WithInboxSize(1024*8), actor.WithMaxRestarts(0)) - ) - its := []int{ - 1_000_000, - 10_000_000, +func newActor() actor.Receiver { + return &benchMarkActor{} +} + +type Benchmark struct { + engineCount int + actorsPerEngine int + senders int + engines []*Engine +} + +func (b *Benchmark) randomEngine() *Engine { + return b.engines[rand.Intn(len(b.engines))] +} + +type Engine struct { + engineID int + actors []*actor.PID + engine *actor.Engine + targetEngines []*Engine + monitor *actor.PID +} + +func (e *Engine) randomActor() *actor.PID { + return e.actors[rand.Intn(len(e.actors))] +} +func (e *Engine) randomTargetEngine() *Engine { + return e.targetEngines[rand.Intn(len(e.targetEngines))] +} + +func newBenchmark(engineCount, actorsPerEngine, senders int) *Benchmark { + b := &Benchmark{ + engineCount: engineCount, + actorsPerEngine: actorsPerEngine, + engines: make([]*Engine, engineCount), + senders: senders, } - for i := 0; i < len(its); i++ { - start := time.Now() - for j := 0; j < its[i]; j++ { - a.Send(pidB, pidB) + return b +} +func (b *Benchmark) spawnEngines() error { + for i := 0; i < b.engineCount; i++ { + r := remote.New(remote.Config{ListenAddr: fmt.Sprintf("localhost:%d", 4000+i)}) + e, err := actor.NewEngine(actor.EngineOptRemote(r)) + if err != nil { + return fmt.Errorf("failed to create engine: %w", err) + } + // spawn the monitor + b.engines[i] = &Engine{ + engineID: i, + actors: make([]*actor.PID, b.actorsPerEngine), + engine: e, + monitor: e.Spawn(newMonitor, "monitor"), } - fmt.Printf("[BENCH HOLLYWOOD REMOTE] processed %d messages in %v\n", its[i], time.Since(start)) + } + // now set up the target engines. These are pointers to all the other engines, except the current one. + for i := 0; i < b.engineCount; i++ { + for j := 0; j < b.engineCount; j++ { + if i == j { + continue + } + b.engines[i].targetEngines = append(b.engines[i].targetEngines, b.engines[j]) + } + } + fmt.Printf("spawned %d engines\n", b.engineCount) + return nil } -func benchmarkLocal() { - e, err := actor.NewEngine() - if err != nil { - log.Fatal(err) +func (b *Benchmark) spawnActors() error { + for i := 0; i < b.engineCount; i++ { + for j := 0; j < b.actorsPerEngine; j++ { + id := fmt.Sprintf("engine-%d-actor-%d", i, j) + b.engines[i].actors[j] = b.engines[i].engine.Spawn(newActor, id) + } } - pid := e.SpawnFunc(func(c *actor.Context) {}, "bench", actor.WithInboxSize(1024*8), actor.WithMaxRestarts(0)) - its := []int{ - 1_000_000, - 10_000_000, + fmt.Printf("spawned %d actors per engine\n", b.actorsPerEngine) + return nil +} +func (b *Benchmark) sendMessages(d time.Duration) error { + wg := sync.WaitGroup{} + wg.Add(b.senders) + deadline := time.Now().Add(d) + for i := 0; i < b.senders; i++ { + go func() { + defer wg.Done() + for time.Now().Before(deadline) { + // pick a random engine to send from + engine := b.randomEngine() + // pick a random target engine: + targetEngine := engine.randomTargetEngine() + // pick a random target actor from the engine + targetActor := targetEngine.randomActor() + // send the message + engine.engine.Send(targetActor, &Message{}) + sendCount.Add(1) + } + }() } - payload := make([]byte, 128) - for i := 0; i < len(its); i++ { - start := time.Now() - for j := 0; j < its[i]; j++ { - e.Send(pid, payload) - } - fmt.Printf("[BENCH HOLLYWOOD LOCAL] processed %d messages in %v\n", its[i], time.Since(start)) + wg.Wait() + time.Sleep(time.Millisecond * 1000) // wait for the messages to be delivered + // compare the global send count with the receive count + if sendCount.Load() != receiveCount.Load() { + return fmt.Errorf("send count and receive count does not match: %d != %d", sendCount.Load(), receiveCount.Load()) } + return nil } -func main() { +func benchmark() error { + const ( + engines = 10 + actorsPerEngine = 2000 + senders = 20 + duration = time.Second * 10 + ) + if runtime.GOMAXPROCS(runtime.NumCPU()) == 1 { - slog.Error("GOMAXPROCS must be greater than 1") - os.Exit(1) + return errors.New("GOMAXPROCS must be greater than 1") } lh := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelError, })) slog.SetDefault(lh) - benchmarkLocal() - benchmarkRemote() + + benchmark := newBenchmark(engines, actorsPerEngine, senders) + err := benchmark.spawnEngines() + if err != nil { + return fmt.Errorf("failed to spawn engines: %w", err) + } + err = benchmark.spawnActors() + if err != nil { + return fmt.Errorf("failed to spawn actors: %w", err) + } + repCh := make(chan struct{}) + go func() { + lastSendCount := sendCount.Load() + for { + select { + case <-repCh: + return + case <-time.After(time.Second): + fmt.Printf("Messages sent per second %d\n", sendCount.Load()-lastSendCount) + lastSendCount = sendCount.Load() + } + } + }() + fmt.Printf("Send storm starting, will send for %v using %d workers\n", duration, senders) + err = benchmark.sendMessages(duration) + if err != nil { + return fmt.Errorf("failed to send messages: %w", err) + } + close(repCh) + fmt.Printf("Concurrent senders: %d messages sent %d, messages received %d - duration: %v\n", senders, sendCount.Load(), receiveCount.Load(), duration) + fmt.Printf("messages per second: %d\n", receiveCount.Load()/int64(duration.Seconds())) + fmt.Printf("deadletters: %d\n", deadLetters.Load()) + return nil +} + +func main() { + err := benchmark() + if err != nil { + slog.Error("failed to run benchmark", "err", err) + os.Exit(1) + } + } diff --git a/_bench/main_test.go b/_bench/main_test.go new file mode 100644 index 0000000..86e9e55 --- /dev/null +++ b/_bench/main_test.go @@ -0,0 +1,36 @@ +package main + +import ( + "testing" +) + +func BenchmarkHollywood(b *testing.B) { + err := benchmark() + if err != nil { + b.Fatal(err) + } +} + +/* +func Benchmark_Latency(b *testing.B) { + slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError}))) + r := remote.New(remote.Config{ListenAddr: "localhost:2013"}) + e, err := actor.NewEngine(actor.EngineOptRemote(r)) + defer r.Stop() + if err != nil { + b.Fatal(err) + } + a := e.Spawn(newActor, "actor") + time.Sleep(10 * time.Millisecond) + b.ResetTimer() + for i := 0; i < b.N; i++ { + res, err := e.Request(a, &Ping{}, 1*time.Millisecond).Result() + if err != nil { + b.Fatal(err) + } + if _, ok := res.(*Pong); !ok { + b.Fatal("unexpected response") + } + } +} +*/ diff --git a/_bench/message.pb.go b/_bench/message.pb.go new file mode 100644 index 0000000..7b90dde --- /dev/null +++ b/_bench/message.pb.go @@ -0,0 +1,266 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v4.25.1 +// source: message.proto + +package main + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Message struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Data string `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` +} + +func (x *Message) Reset() { + *x = Message{} + if protoimpl.UnsafeEnabled { + mi := &file_message_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Message) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Message) ProtoMessage() {} + +func (x *Message) ProtoReflect() protoreflect.Message { + mi := &file_message_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Message.ProtoReflect.Descriptor instead. +func (*Message) Descriptor() ([]byte, []int) { + return file_message_proto_rawDescGZIP(), []int{0} +} + +func (x *Message) GetData() string { + if x != nil { + return x.Data + } + return "" +} + +type Ping struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Data string `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` +} + +func (x *Ping) Reset() { + *x = Ping{} + if protoimpl.UnsafeEnabled { + mi := &file_message_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Ping) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Ping) ProtoMessage() {} + +func (x *Ping) ProtoReflect() protoreflect.Message { + mi := &file_message_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Ping.ProtoReflect.Descriptor instead. +func (*Ping) Descriptor() ([]byte, []int) { + return file_message_proto_rawDescGZIP(), []int{1} +} + +func (x *Ping) GetData() string { + if x != nil { + return x.Data + } + return "" +} + +type Pong struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Data string `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` +} + +func (x *Pong) Reset() { + *x = Pong{} + if protoimpl.UnsafeEnabled { + mi := &file_message_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Pong) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Pong) ProtoMessage() {} + +func (x *Pong) ProtoReflect() protoreflect.Message { + mi := &file_message_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Pong.ProtoReflect.Descriptor instead. +func (*Pong) Descriptor() ([]byte, []int) { + return file_message_proto_rawDescGZIP(), []int{2} +} + +func (x *Pong) GetData() string { + if x != nil { + return x.Data + } + return "" +} + +var File_message_proto protoreflect.FileDescriptor + +var file_message_proto_rawDesc = []byte{ + 0x0a, 0x0d, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x04, 0x6d, 0x61, 0x69, 0x6e, 0x22, 0x1d, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, + 0x64, 0x61, 0x74, 0x61, 0x22, 0x1a, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x12, 0x0a, 0x04, + 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x22, 0x1a, 0x0a, 0x04, 0x50, 0x6f, 0x6e, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x42, 0x29, 0x5a, 0x27, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x6e, 0x74, 0x68, 0x64, + 0x6d, 0x2f, 0x68, 0x6f, 0x6c, 0x6c, 0x79, 0x77, 0x6f, 0x6f, 0x64, 0x2f, 0x5f, 0x62, 0x65, 0x6e, + 0x63, 0x68, 0x2f, 0x6d, 0x61, 0x69, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_message_proto_rawDescOnce sync.Once + file_message_proto_rawDescData = file_message_proto_rawDesc +) + +func file_message_proto_rawDescGZIP() []byte { + file_message_proto_rawDescOnce.Do(func() { + file_message_proto_rawDescData = protoimpl.X.CompressGZIP(file_message_proto_rawDescData) + }) + return file_message_proto_rawDescData +} + +var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_message_proto_goTypes = []interface{}{ + (*Message)(nil), // 0: main.Message + (*Ping)(nil), // 1: main.Ping + (*Pong)(nil), // 2: main.Pong +} +var file_message_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_message_proto_init() } +func file_message_proto_init() { + if File_message_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_message_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Message); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_message_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Ping); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_message_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Pong); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_message_proto_rawDesc, + NumEnums: 0, + NumMessages: 3, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_message_proto_goTypes, + DependencyIndexes: file_message_proto_depIdxs, + MessageInfos: file_message_proto_msgTypes, + }.Build() + File_message_proto = out.File + file_message_proto_rawDesc = nil + file_message_proto_goTypes = nil + file_message_proto_depIdxs = nil +} diff --git a/_bench/message.proto b/_bench/message.proto new file mode 100644 index 0000000..ea3ef17 --- /dev/null +++ b/_bench/message.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; +package main; +option go_package = "github.com/anthdm/hollywood/_bench/main"; + +message Message { + string data = 1; +} + +message Ping { + string data = 1; +} + +message Pong { + string data = 1; +} \ No newline at end of file