Skip to content

Commit

Permalink
Merge pull request #31 from hallgren/sync_eventstream_updates
Browse files Browse the repository at this point in the history
add lock to event stream to run all subscribing func in sync with the events
  • Loading branch information
hallgren authored Dec 5, 2019
2 parents 436682a + cb57c4a commit 1bf0bc3
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 44 deletions.
36 changes: 22 additions & 14 deletions eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,43 @@ package eventsourcing

import (
"reflect"
"sync"
)

type EventStream struct {
specificEvents map[reflect.Type][]func(e Event)
allEvents []func(e Event)
specificEvents map[reflect.Type][]func(e Event)
allEvents []func(e Event)
publishLock sync.Mutex
}

// NewEventStream factory function
func NewEventStream() *EventStream {
return &EventStream{
specificEvents: make(map[reflect.Type][]func(e Event)),
allEvents: []func(e Event){},
specificEvents: make(map[reflect.Type][]func(e Event)),
allEvents: []func(e Event){},
}
}

// Update calls the functions that are subscribing to event
func (e *EventStream) Update(event Event) {
// call all functions that has registered for the specific event
t := reflect.TypeOf(event.Data)
if functions, ok := e.specificEvents[t]; ok {
for _, f := range functions {
f(event)
func (e *EventStream) Update(events []Event) {
// the lock prevent other event updates get mixed with this update
e.publishLock.Lock()
for _, event := range events {

// call all functions that has registered for the specific event
t := reflect.TypeOf(event.Data)
if functions, ok := e.specificEvents[t]; ok {
for _, f := range functions {
f(event)
}
}
}

// call all functions that has registered for all events
for _, f := range e.allEvents {
f(event)
// call all functions that has registered for all events
for _, f := range e.allEvents {
f(event)
}
}
e.publishLock.Unlock()
}

// Subscribe bind the f function to be called either when all or specific events are created in the system
Expand Down
88 changes: 68 additions & 20 deletions eventstream_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package eventsourcing_test

import (
"github.com/hallgren/eventsourcing"
"sync"
"testing"

"github.com/hallgren/eventsourcing"
)

type AnEvent struct {
Expand All @@ -11,8 +13,8 @@ type AnEvent struct {

type AnotherEvent struct{}

var event = eventsourcing.Event{Version: 123, Data: &AnEvent{Name: "123"}}
var otherEvent = eventsourcing.Event{Version: 123, Data: &AnotherEvent{}}
var event = eventsourcing.Event{Version: 123, Data: &AnEvent{Name: "123"}, Reason: "AnEvent"}
var otherEvent = eventsourcing.Event{Version: 123, Data: &AnotherEvent{}, Reason: "AnotherEvent"}

func TestGlobal(t *testing.T) {
var streamEvent *eventsourcing.Event
Expand All @@ -21,7 +23,7 @@ func TestGlobal(t *testing.T) {
streamEvent = &e
}
e.Subscribe(f)
e.Update(event)
e.Update([]eventsourcing.Event{event})

//time.Sleep(1 * time.Second)
if streamEvent == nil {
Expand All @@ -32,15 +34,14 @@ func TestGlobal(t *testing.T) {
}
}


func TestSpecific(t *testing.T) {
var streamEvent *eventsourcing.Event
e := eventsourcing.NewEventStream()
f := func(e eventsourcing.Event) {
streamEvent = &e
}
e.Subscribe(f,&AnEvent{})
e.Update(event)
e.Subscribe(f, &AnEvent{})
e.Update([]eventsourcing.Event{event})

if streamEvent == nil {
t.Fatalf("should have received event")
Expand All @@ -58,8 +59,8 @@ func TestManySpecific(t *testing.T) {
streamEvents = append(streamEvents, &e)
}
e.Subscribe(f, &AnEvent{}, &AnotherEvent{})
e.Update(event)
e.Update(otherEvent)
e.Update([]eventsourcing.Event{event})
e.Update([]eventsourcing.Event{otherEvent})

if streamEvents == nil {
t.Fatalf("should have received event")
Expand Down Expand Up @@ -88,22 +89,22 @@ func TestUpdateNoneSubscribedEvent(t *testing.T) {
streamEvent = &e
}
e.Subscribe(f, &AnotherEvent{})
e.Update(event)
e.Update([]eventsourcing.Event{event})

if streamEvent != nil {
t.Fatalf("should not have received event %q", streamEvent)
}
}

func TestManySubscribers(t *testing.T) {
streamEvent1 := make([]eventsourcing.Event,0)
streamEvent2 := make([]eventsourcing.Event,0)
streamEvent3 := make([]eventsourcing.Event,0)
streamEvent4 := make([]eventsourcing.Event,0)
streamEvent1 := make([]eventsourcing.Event, 0)
streamEvent2 := make([]eventsourcing.Event, 0)
streamEvent3 := make([]eventsourcing.Event, 0)
streamEvent4 := make([]eventsourcing.Event, 0)

e := eventsourcing.NewEventStream()
f1 := func(e eventsourcing.Event) {
streamEvent1 = append(streamEvent1,e)
streamEvent1 = append(streamEvent1, e)
}
f2 := func(e eventsourcing.Event) {
streamEvent2 = append(streamEvent2, e)
Expand All @@ -112,14 +113,14 @@ func TestManySubscribers(t *testing.T) {
streamEvent3 = append(streamEvent3, e)
}
f4 := func(e eventsourcing.Event) {
streamEvent4 = append(streamEvent4,e)
streamEvent4 = append(streamEvent4, e)
}
e.Subscribe(f1,&AnotherEvent{})
e.Subscribe(f2,&AnotherEvent{}, &AnEvent{})
e.Subscribe(f3,&AnEvent{})
e.Subscribe(f1, &AnotherEvent{})
e.Subscribe(f2, &AnotherEvent{}, &AnEvent{})
e.Subscribe(f3, &AnEvent{})
e.Subscribe(f4)

e.Update(event)
e.Update([]eventsourcing.Event{event})

if len(streamEvent1) != 0 {
t.Fatalf("stream1 should not have any events")
Expand All @@ -137,3 +138,50 @@ func TestManySubscribers(t *testing.T) {
t.Fatalf("stream4 should have one event")
}
}

func TestParallelUpdates(t *testing.T) {
streamEvent := make([]eventsourcing.Event, 0)
e := eventsourcing.NewEventStream()
lock := sync.Mutex{}

// functions to bind to event subscription
f1 := func(e eventsourcing.Event) {
lock.Lock()
streamEvent = append(streamEvent, e)
lock.Unlock()
}
f2 := func(e eventsourcing.Event) {
lock.Lock()
streamEvent = append(streamEvent, e)
lock.Unlock()
}
f3 := func(e eventsourcing.Event) {
lock.Lock()
streamEvent = append(streamEvent, e)
lock.Unlock()
}
e.Subscribe(f1, &AnEvent{})
e.Subscribe(f2, &AnotherEvent{})
e.Subscribe(f3)

// concurrently update the event stream
for i := 1; i < 1000; i++ {
go e.Update([]eventsourcing.Event{otherEvent, otherEvent})
go e.Update([]eventsourcing.Event{event, event})
}

var lastEvent eventsourcing.Event
// check that events comes coupled together in four due to the lock in the event stream that makes sure all registered
// functions are called together and that is not mixed with other events
lock.Lock()
for j, event := range streamEvent {
if j%4 == 0 {
lastEvent = event
} else {
if lastEvent.Reason != event.Reason {
t.Fatal("same event should come in couple of four")
}
}
}
lock.Unlock()
}
9 changes: 1 addition & 8 deletions repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,11 @@ func (r *Repository) Save(aggregate aggregate) error {
}

// publish the saved events to subscribers
// in async to make sure subscribers not blocks the call to Save
events := aggregate.changes()
go func() {
for _, event := range events {
r.eventStream.Update(event)
}
}()

r.eventStream.Update(events)

// aggregate are saved to the event store now its safe to update the internal aggregate state
aggregate.updateVersion()

return nil
}

Expand Down
2 changes: 0 additions & 2 deletions repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ func TestSaveSnapshotWithoutSnapshotStore(t *testing.T) {
}
}


func TestSubscription(t *testing.T) {
prop := observer.NewProperty(nil)
stream := prop.Observe()
Expand Down Expand Up @@ -159,4 +158,3 @@ outer:
t.Errorf("No global events was received from the stream, got %q", counter)
}
}

0 comments on commit 1bf0bc3

Please sign in to comment.