diff --git a/actor/inbox.go b/actor/inbox.go index c2bcc9b..eda83be 100644 --- a/actor/inbox.go +++ b/actor/inbox.go @@ -1,6 +1,8 @@ package actor import ( + "runtime" + "github.com/anthdm/hollywood/ggq" "github.com/anthdm/hollywood/log" ) @@ -30,15 +32,15 @@ func (in *Inbox) Consume(msgs []Envelope) { func (in *Inbox) Start(proc Processer) { in.proc = proc - // var lockOSThread bool + var lockOSThread bool // prevent race condition here be reassigning before go routine. - // if LOCK_OS_THREAD { - // lockOSThread = true - // } + if LOCK_OS_THREAD { + lockOSThread = true + } go func() { - // if lockOSThread { - // runtime.LockOSThread() - // } + if lockOSThread { + runtime.LockOSThread() + } in.ggq.ReadN() }() log.Tracew("[INBOX] started", log.M{"pid": proc.PID()}) diff --git a/actor/pid_test.go b/actor/pid_test.go index 528f183..50a5953 100644 --- a/actor/pid_test.go +++ b/actor/pid_test.go @@ -6,33 +6,6 @@ import ( "github.com/stretchr/testify/assert" ) -func BenchmarkXxddd(b *testing.B) { - var key string - var keyint uint64 - pid := NewPID("127.0.0.1:4000", "foobar") - - b.Run("+", func(b *testing.B) { - for i := 0; i < b.N; i++ { - key = pid.Address + pidSeparator + pid.ID - } - }) - b.Run("key", func(b *testing.B) { - for i := 0; i < b.N; i++ { - keyint = pid.LookupKey() - } - }) - - _ = key - _ = keyint -} - -func BenchmarkLookupKey(b *testing.B) { - pid := NewPID("127.0.0.1:3000", "foo") - for i := 0; i < b.N; i++ { - pid.LookupKey() - } -} - func BenchmarkNewPID(b *testing.B) { for i := 0; i < b.N; i++ { NewPID("127.0.0.1:3000", "foo") diff --git a/ggq/ggq.go b/ggq/ggq.go index dd036cf..53a995d 100644 --- a/ggq/ggq.go +++ b/ggq/ggq.go @@ -26,10 +26,6 @@ const ( stateClosed ) -var ( - state atomic.Int32 -) - type slot[T any] struct { item T atomic.Uint32 @@ -41,6 +37,8 @@ type GGQ[T any] struct { _ [cacheLinePadding - unsafe.Sizeof(atomic.Uint32{})]byte read atomic.Uint32 _ [cacheLinePadding - unsafe.Sizeof(atomic.Uint32{})]byte + state atomic.Uint32 + _ [cacheLinePadding - unsafe.Sizeof(atomic.Uint32{})]byte buffer []slot[T] _ [cacheLinePadding]byte mask uint32 @@ -85,10 +83,9 @@ func (q *GGQ[T]) ReadN() (T, bool) { q.read.Store(upper) current = upper runtime.Gosched() - // time.Sleep(time.Nanosecond) } else if upper := q.written.Load(); lower <= upper { runtime.Gosched() - } else if !state.CompareAndSwap(stateClosed, stateRunning) { + } else if !q.state.CompareAndSwap(stateClosed, stateRunning) { time.Sleep(time.Microsecond) } else { break @@ -126,7 +123,7 @@ func (q *GGQ[T]) Read() (T, bool) { case slotBusy: runtime.Gosched() case slotEmpty: - if state.CompareAndSwap(stateClosed, stateRunning) { + if q.state.CompareAndSwap(stateClosed, stateRunning) { var t T return t, true } @@ -141,7 +138,7 @@ func (q *GGQ[T]) Read() (T, bool) { } func (q *GGQ[T]) Close() { - state.Store(stateClosed) + q.state.Store(stateClosed) } func isPOW2(n uint32) bool { diff --git a/go.mod b/go.mod index f9cd9be..f717567 100644 --- a/go.mod +++ b/go.mod @@ -6,18 +6,19 @@ require ( github.com/planetscale/vtprotobuf v0.4.0 github.com/sirupsen/logrus v1.9.0 github.com/stretchr/testify v1.8.1 - github.com/zeebo/xxh3 v1.0.2 google.golang.org/grpc v1.53.0 google.golang.org/protobuf v1.28.1 storj.io/drpc v0.0.32 ) +require github.com/klauspost/cpuid/v2 v2.0.9 // indirect + require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/protobuf v1.5.2 // indirect - github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/zeebo/errs v1.2.2 // indirect + github.com/zeebo/xxh3 v1.0.2 golang.org/x/net v0.5.0 // indirect golang.org/x/sys v0.4.0 // indirect golang.org/x/text v0.6.0 // indirect