Skip to content

Commit

Permalink
removed github.com/imkira/go-observer dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
hallgren committed Dec 23, 2019
1 parent 3490adf commit ae36126
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 29 deletions.
9 changes: 3 additions & 6 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,27 @@ import (
"github.com/hallgren/eventsourcing"
"github.com/hallgren/eventsourcing/eventstore/memory"
"github.com/hallgren/eventsourcing/serializer/unsafe"
"github.com/imkira/go-observer"
"time"
)

func main() {
// go-observer to write and read events async
prop := observer.NewProperty(nil)
stream := prop.Observe()
var c = make(chan eventsourcing.Event)
// Setup a memory based event store
eventStore := memory.Create(unsafe.New())
repo := eventsourcing.NewRepository(eventStore, nil)
f := func(e eventsourcing.Event) {
fmt.Printf("Event from stream %q\n", e)
// Its a good practice making this function as fast as possible not blocking the event sourcing call for to long
// Here we use the go-observer pkg to store the events in a stream to be consumed async
prop.Update(e)
c <- e
}
repo.Subscribe(f)

// Read the event stream async
go func() {
for {
// advance to next value
event := stream.WaitNext().(eventsourcing.Event)
event := <-c
fmt.Println("STREAM EVENT")
fmt.Println(event)
}
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,5 @@ go 1.13
require (
github.com/etcd-io/bbolt v1.3.3
github.com/gofrs/uuid v3.2.0+incompatible
github.com/imkira/go-observer v1.0.3
github.com/mattn/go-sqlite3 v1.11.0
)
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,5 @@ github.com/etcd-io/bbolt v1.3.3 h1:gSJmxrs37LgTqR/oyJBWok6k6SvXEUerFTbltIhXkBM=
github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHjkjCrw=
github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE=
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/imkira/go-observer v1.0.3 h1:l45TYAEeAB4L2xF6PR2gRLn2NE5tYhudh33MLmC7B80=
github.com/imkira/go-observer v1.0.3/go.mod h1:zLzElv2cGTHufQG17IEILJMPDg32TD85fFgKyFv00wU=
github.com/mattn/go-sqlite3 v1.11.0 h1:LDdKkqtYlom37fkvqs8rMPFKAMe8+SgjbwZ6ex1/A/Q=
github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
45 changes: 25 additions & 20 deletions repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (
"github.com/hallgren/eventsourcing/eventstore/memory"
"github.com/hallgren/eventsourcing/serializer/json"
"github.com/hallgren/eventsourcing/snapshotstore"
"github.com/imkira/go-observer"
"testing"
"time"
)

func TestSaveAndGetAggregate(t *testing.T) {
Expand Down Expand Up @@ -113,10 +111,9 @@ func TestSaveSnapshotWithoutSnapshotStore(t *testing.T) {
}

func TestSubscription(t *testing.T) {
prop := observer.NewProperty(nil)
stream := prop.Observe()
counter := 0
f := func(e eventsourcing.Event) {
prop.Update(e)
counter++
}
serializer := json.New()
serializer.Register(&Person{}, &Born{}, &AgedOneYear{})
Expand All @@ -135,25 +132,33 @@ func TestSubscription(t *testing.T) {
if err != nil {
t.Fatal("could not save aggregate")
}
if counter != 4 {
t.Errorf("No global events was received from the stream, got %q", counter)
}
}

func TestSubscriptionSpecific(t *testing.T) {
counter := 0
outer:
for {
select {
// wait for changes
case <-stream.Changes():
// advance to next value
stream.Next()
counter++
if counter == 4 {
return
}
case <-time.After(100 * time.Millisecond):
// The stream has 10 milliseconds to deliver the events
break outer
}
f := func(e eventsourcing.Event) {
counter++
}
serializer := json.New()
serializer.Register(&Person{}, &Born{}, &AgedOneYear{})
repo := eventsourcing.NewRepository(memory.Create(serializer), nil)
repo.Subscribe(f, &Born{}, &AgedOneYear{})

person, err := CreatePerson("kalle")
if err != nil {
t.Fatal(err)
}
person.GrowOlder()
person.GrowOlder()
person.GrowOlder()

err = repo.Save(person)
if err != nil {
t.Fatal("could not save aggregate")
}
if counter != 4 {
t.Errorf("No global events was received from the stream, got %q", counter)
}
Expand Down

0 comments on commit ae36126

Please sign in to comment.