Skip to content

Commit

Permalink
New benchmark suite (#97)
Browse files Browse the repository at this point in the history
* added max size for batching messages

* default message batch size

* wip: chaos.

* Add benchmark actor and protobuf message

The update introduces a benchmark actor that receives messages and increments a message counter. It also includes the creation of a new protobuf message. The main function is updated to incorporate this benchmark actor, while providing a detailed simulation of sending messages across multiple actors distributed across different engines.

* Refactor benchmark code and add profiling

The benchmark code is refactored for better error handling and message checks. 'benchmark' function is extracted from 'main' for further testing. A new Makefile target 'bench-profile' is added for profiling. A new test file 'main_test.go' is created for benchmark testing. Corresponding instructions are added in the newly created README.md file. Also, .gitignore is updated to exclude the created test, cpu, and memory profiles.

* document how to use the interactive web interface.

* add latency measurement as well.

* bah. can't make go test ignore the new latency tests. I just commented it out. Gonna revisit latency benchmarks later.

* Update benchmark command in Makefile

The benchmark command in the Makefile has been updated to run the whole package, not just main.go

---------

Co-authored-by: anthdm <[email protected]>
  • Loading branch information
perbu and anthdm authored Dec 11, 2023
1 parent 877828d commit 4fe2643
Show file tree
Hide file tree
Showing 7 changed files with 561 additions and 45 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,7 @@ TODO
_test
/.vscode
/.idea
.DS_Store
.DS_Store
/_bench.test
/cpu.prof
/mem.prof
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ build:
go build -o bin/chatclient examples/chat/client/main.go

bench:
go run _bench/main.go
go run ./_bench/.

bench-profile:
go test -bench='^BenchmarkHollywood$$' -run=NONE -cpuprofile cpu.prof -memprofile mem.prof ./_bench

.PHONY: proto
43 changes: 43 additions & 0 deletions _bench/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Benchmark suite for Hollywood

This is a benchmark suite for the Hollywood framework. It spins up a number of engines, a whole lot of actors
and then sends messages between them.

## Running the benchmark
```
make bench
```

## Profiling the benchmark

We can use the `pprof` tool to profile the benchmark. First, we need to run the benchmark with profiling enabled:
```
make bench-profile
```

This will run the benchmark and generate a CPU and a memory profile. We can then use the `pprof` tool to analyze the
profiles.


## Analyzing the profiles

### For CPU profile, basic view
```
go tool pprof cpu.prof
> web
```

### For Memory profile, basic view
```
go tool pprof mem.prof
> web
```

### Fancy web interface
```
go tool pprof -http=:8080 cpu.prof
```
and
```
go tool pprof -http=:8080 mem.prof
```
236 changes: 193 additions & 43 deletions _bench/main.go
Original file line number Diff line number Diff line change
@@ -1,74 +1,224 @@
package main

import (
"errors"
"fmt"
"log"
"github.com/anthdm/hollywood/actor"
"github.com/anthdm/hollywood/remote"
"log/slog"
"math/rand"
"os"
"runtime"
"sync"
"sync/atomic"
"time"
)

"github.com/anthdm/hollywood/actor"
"github.com/anthdm/hollywood/remote"
//go:generate protoc --proto_path=. --go_out=. --go_opt=paths=source_relative message.proto

type monitor struct {
}

func (m *monitor) Receive(ctx *actor.Context) {
switch ctx.Message().(type) {
case actor.Initialized:
ctx.Engine().BroadcastEvent(&actor.EventSub{})
case actor.DeadLetterEvent:
deadLetters.Add(1)
}
}
func newMonitor() actor.Receiver {
return &monitor{}
}

type benchMarkActor struct {
internalMessageCount int64
}

var (
receiveCount *atomic.Int64
sendCount *atomic.Int64
deadLetters *atomic.Int64
)

func makeRemoteEngine(addr string) *actor.Engine {
r := remote.New(remote.Config{ListenAddr: addr})
e, err := actor.NewEngine(actor.EngineOptRemote(r))
if err != nil {
log.Fatal(err)
func init() {
receiveCount = &atomic.Int64{}
sendCount = &atomic.Int64{}
deadLetters = &atomic.Int64{}
}

func (b *benchMarkActor) Receive(ctx *actor.Context) {
switch ctx.Message().(type) {
case *Message:
b.internalMessageCount++
receiveCount.Add(1)
case *Ping:
ctx.Respond(&Pong{})
}
return e
}

func benchmarkRemote() {
var (
a = makeRemoteEngine("127.0.0.1:3000")
b = makeRemoteEngine("127.0.0.1:3001")
pidB = b.SpawnFunc(func(c *actor.Context) {}, "bench", actor.WithInboxSize(1024*8), actor.WithMaxRestarts(0))
)
its := []int{
1_000_000,
10_000_000,
func newActor() actor.Receiver {
return &benchMarkActor{}
}

type Benchmark struct {
engineCount int
actorsPerEngine int
senders int
engines []*Engine
}

func (b *Benchmark) randomEngine() *Engine {
return b.engines[rand.Intn(len(b.engines))]
}

type Engine struct {
engineID int
actors []*actor.PID
engine *actor.Engine
targetEngines []*Engine
monitor *actor.PID
}

func (e *Engine) randomActor() *actor.PID {
return e.actors[rand.Intn(len(e.actors))]
}
func (e *Engine) randomTargetEngine() *Engine {
return e.targetEngines[rand.Intn(len(e.targetEngines))]
}

func newBenchmark(engineCount, actorsPerEngine, senders int) *Benchmark {
b := &Benchmark{
engineCount: engineCount,
actorsPerEngine: actorsPerEngine,
engines: make([]*Engine, engineCount),
senders: senders,
}
for i := 0; i < len(its); i++ {
start := time.Now()
for j := 0; j < its[i]; j++ {
a.Send(pidB, pidB)
return b
}
func (b *Benchmark) spawnEngines() error {
for i := 0; i < b.engineCount; i++ {
r := remote.New(remote.Config{ListenAddr: fmt.Sprintf("localhost:%d", 4000+i)})
e, err := actor.NewEngine(actor.EngineOptRemote(r))
if err != nil {
return fmt.Errorf("failed to create engine: %w", err)
}
// spawn the monitor
b.engines[i] = &Engine{
engineID: i,
actors: make([]*actor.PID, b.actorsPerEngine),
engine: e,
monitor: e.Spawn(newMonitor, "monitor"),
}
fmt.Printf("[BENCH HOLLYWOOD REMOTE] processed %d messages in %v\n", its[i], time.Since(start))

}
// now set up the target engines. These are pointers to all the other engines, except the current one.
for i := 0; i < b.engineCount; i++ {
for j := 0; j < b.engineCount; j++ {
if i == j {
continue
}
b.engines[i].targetEngines = append(b.engines[i].targetEngines, b.engines[j])
}
}
fmt.Printf("spawned %d engines\n", b.engineCount)
return nil
}

func benchmarkLocal() {
e, err := actor.NewEngine()
if err != nil {
log.Fatal(err)
func (b *Benchmark) spawnActors() error {
for i := 0; i < b.engineCount; i++ {
for j := 0; j < b.actorsPerEngine; j++ {
id := fmt.Sprintf("engine-%d-actor-%d", i, j)
b.engines[i].actors[j] = b.engines[i].engine.Spawn(newActor, id)
}
}
pid := e.SpawnFunc(func(c *actor.Context) {}, "bench", actor.WithInboxSize(1024*8), actor.WithMaxRestarts(0))
its := []int{
1_000_000,
10_000_000,
fmt.Printf("spawned %d actors per engine\n", b.actorsPerEngine)
return nil
}
func (b *Benchmark) sendMessages(d time.Duration) error {
wg := sync.WaitGroup{}
wg.Add(b.senders)
deadline := time.Now().Add(d)
for i := 0; i < b.senders; i++ {
go func() {
defer wg.Done()
for time.Now().Before(deadline) {
// pick a random engine to send from
engine := b.randomEngine()
// pick a random target engine:
targetEngine := engine.randomTargetEngine()
// pick a random target actor from the engine
targetActor := targetEngine.randomActor()
// send the message
engine.engine.Send(targetActor, &Message{})
sendCount.Add(1)
}
}()
}
payload := make([]byte, 128)
for i := 0; i < len(its); i++ {
start := time.Now()
for j := 0; j < its[i]; j++ {
e.Send(pid, payload)
}
fmt.Printf("[BENCH HOLLYWOOD LOCAL] processed %d messages in %v\n", its[i], time.Since(start))
wg.Wait()
time.Sleep(time.Millisecond * 1000) // wait for the messages to be delivered
// compare the global send count with the receive count
if sendCount.Load() != receiveCount.Load() {
return fmt.Errorf("send count and receive count does not match: %d != %d", sendCount.Load(), receiveCount.Load())
}
return nil
}

func main() {
func benchmark() error {
const (
engines = 10
actorsPerEngine = 2000
senders = 20
duration = time.Second * 10
)

if runtime.GOMAXPROCS(runtime.NumCPU()) == 1 {
slog.Error("GOMAXPROCS must be greater than 1")
os.Exit(1)
return errors.New("GOMAXPROCS must be greater than 1")
}
lh := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelError,
}))
slog.SetDefault(lh)
benchmarkLocal()
benchmarkRemote()

benchmark := newBenchmark(engines, actorsPerEngine, senders)
err := benchmark.spawnEngines()
if err != nil {
return fmt.Errorf("failed to spawn engines: %w", err)
}
err = benchmark.spawnActors()
if err != nil {
return fmt.Errorf("failed to spawn actors: %w", err)
}
repCh := make(chan struct{})
go func() {
lastSendCount := sendCount.Load()
for {
select {
case <-repCh:
return
case <-time.After(time.Second):
fmt.Printf("Messages sent per second %d\n", sendCount.Load()-lastSendCount)
lastSendCount = sendCount.Load()
}
}
}()
fmt.Printf("Send storm starting, will send for %v using %d workers\n", duration, senders)
err = benchmark.sendMessages(duration)
if err != nil {
return fmt.Errorf("failed to send messages: %w", err)
}
close(repCh)
fmt.Printf("Concurrent senders: %d messages sent %d, messages received %d - duration: %v\n", senders, sendCount.Load(), receiveCount.Load(), duration)
fmt.Printf("messages per second: %d\n", receiveCount.Load()/int64(duration.Seconds()))
fmt.Printf("deadletters: %d\n", deadLetters.Load())
return nil
}

func main() {
err := benchmark()
if err != nil {
slog.Error("failed to run benchmark", "err", err)
os.Exit(1)
}

}
36 changes: 36 additions & 0 deletions _bench/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package main

import (
"testing"
)

func BenchmarkHollywood(b *testing.B) {
err := benchmark()
if err != nil {
b.Fatal(err)
}
}

/*
func Benchmark_Latency(b *testing.B) {
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError})))
r := remote.New(remote.Config{ListenAddr: "localhost:2013"})
e, err := actor.NewEngine(actor.EngineOptRemote(r))
defer r.Stop()
if err != nil {
b.Fatal(err)
}
a := e.Spawn(newActor, "actor")
time.Sleep(10 * time.Millisecond)
b.ResetTimer()
for i := 0; i < b.N; i++ {
res, err := e.Request(a, &Ping{}, 1*time.Millisecond).Result()
if err != nil {
b.Fatal(err)
}
if _, ok := res.(*Pong); !ok {
b.Fatal("unexpected response")
}
}
}
*/
Loading

0 comments on commit 4fe2643

Please sign in to comment.