Skip to content

Commit

Permalink
test.GUIDFactory to avoid ErrSequenceExpired on benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
jehiah committed Jan 1, 2021
1 parent 75c12ba commit 2cb1881
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 3 deletions.
15 changes: 15 additions & 0 deletions internal/test/guids.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package test

import (
"sync/atomic"
)

// GUIDFactory is an atomic sequence that can be used for MessageID's for benchmarks
// to avoid ErrSequenceExpired when creating a large number of messages
type GUIDFactory struct {
n int64
}

func (gf *GUIDFactory) NextMessageID() int64 {
return atomic.AddInt64(&gf.n, 1)
}
9 changes: 6 additions & 3 deletions nsqd/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,12 @@ func BenchmarkTopicPut(b *testing.B) {
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()
gf := &test.GUIDFactory{}
b.StartTimer()

for i := 0; i <= b.N; i++ {
topic := nsqd.GetOrCreateTopic(topicName)
msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
msg := NewMessage(guid(gf.NextMessageID()).Hex(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
topic.PutMessage(msg)
}
}
Expand All @@ -260,11 +261,12 @@ func BenchmarkTopicToChannelPut(b *testing.B) {
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()
channel := nsqd.GetOrCreateTopic(topicName).GetOrCreateChannel(channelName)
gf := &test.GUIDFactory{}
b.StartTimer()

for i := 0; i <= b.N; i++ {
topic := nsqd.GetOrCreateTopic(topicName)
msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
msg := NewMessage(guid(gf.NextMessageID()).Hex(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
topic.PutMessage(msg)
}

Expand All @@ -290,6 +292,7 @@ func BenchmarkTopicMessagePump(b *testing.B) {
topic := nsqd.GetOrCreateTopic(topicName)
ch := topic.GetOrCreateChannel("ch")
ctx, cancel := context.WithCancel(context.Background())
gf := &test.GUIDFactory{}

var wg sync.WaitGroup
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
Expand All @@ -308,7 +311,7 @@ func BenchmarkTopicMessagePump(b *testing.B) {

b.StartTimer()
for i := 0; i <= b.N; i++ {
msg := NewMessage(topic.GenerateID(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
msg := NewMessage(guid(gf.NextMessageID()).Hex(), []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
topic.PutMessage(msg)
}
cancel()
Expand Down

0 comments on commit 2cb1881

Please sign in to comment.