Skip to content

Commit

Permalink
Merge pull request #99 from hallgren/iterator_interface
Browse files Browse the repository at this point in the history
Iterator interface re-design
  • Loading branch information
hallgren authored Jan 11, 2024
2 parents 104105b + c1319f8 commit fc04d13
Show file tree
Hide file tree
Showing 14 changed files with 93 additions and 91 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ The only thing an event store handles are events, and it must implement the foll

```go
// saves events to the under laying data store.
Save(events []eventsourcing.Event) error
Save(events []core.Event) error

// fetches events based on identifier and type but also after a specific version. The version is used to load event that happened after a snapshot was taken.
Get(id string, aggregateType string, afterVersion eventsourcing.Version) (eventsourcing.Iterator, error)
Get(id string, aggregateType string, afterVersion core.Version) (core.Iterator, error)
```

Currently, there are three implementations.
Expand Down
9 changes: 2 additions & 7 deletions core/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,13 @@ import (
"errors"
)

// ErrNoEvents when there is no events to get
var ErrNoEvents = errors.New("no events")

// ErrNoMoreEvents when iterator has no more events to deliver
var ErrNoMoreEvents = errors.New("no more events")

// ErrConcurrency when the currently saved version of the aggregate differs from the new ones
var ErrConcurrency = errors.New("concurrency error")

// Iterator is the interface an event store Get needs to return
type Iterator interface {
Next() (Event, error)
Next() bool
Value() (Event, error)
Close()
}

Expand Down
16 changes: 16 additions & 0 deletions core/nopiterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package core

// ZeroIterator returns no data
type ZeroIterator struct{}

func (ni ZeroIterator) Next() bool {
return false
}

func (ni ZeroIterator) Value() (Event, error) {
return Event{}, nil
}

func (ni ZeroIterator) Close() {
return
}
29 changes: 11 additions & 18 deletions core/testsuite/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,8 @@ func saveAndGetEvents(es core.EventStore) error {
if err != nil {
return err
}
for {
event, err := iterator.Next()
if errors.Is(err, core.ErrNoMoreEvents) {
break
}
for iterator.Next() {
event, err := iterator.Value()
if err != nil {
return err
}
Expand All @@ -150,8 +147,8 @@ func saveAndGetEvents(es core.EventStore) error {
if err != nil {
return err
}
for {
event, err := iterator.Next()
for iterator.Next() {
event, err := iterator.Value()
if err != nil {
break
}
Expand Down Expand Up @@ -194,8 +191,8 @@ func getEventsAfterVersion(es core.EventStore) error {
return err
}

for {
event, err := iterator.Next()
for iterator.Next() {
event, err := iterator.Value()
if err != nil {
break
}
Expand Down Expand Up @@ -256,8 +253,8 @@ func saveAndGetEventsConcurrently(es core.EventStore) error {
return
}
events := make([]core.Event, 0)
for {
event, err := iterator.Next()
for iterator.Next() {
event, err := iterator.Value()
if err != nil {
break
}
Expand All @@ -281,15 +278,11 @@ func getErrWhenNoEvents(es core.EventStore) error {
aggregateID := AggregateID()
iterator, err := es.Get(context.Background(), aggregateID, aggregateType, 0)
if err != nil {
if err != core.ErrNoEvents {
return err
}
return nil
return err
}
defer iterator.Close()
_, err = iterator.Next()
if !errors.Is(err, core.ErrNoMoreEvents) {
return fmt.Errorf("expect error when no events are saved for aggregate")
if iterator.Next() {
return fmt.Errorf("expect no event when no events are saved")
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion eventstore/bbolt/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ require (
golang.org/x/sys v0.10.0 // indirect
)

//replace github.com/hallgren/eventsourcing/core => ../../core
replace github.com/hallgren/eventsourcing/core => ../../core
28 changes: 17 additions & 11 deletions eventstore/bbolt/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,40 @@ type iterator struct {
bucketName string
firstEventIndex uint64
cursor *bbolt.Cursor
value []byte
}

// Close closes the iterator
func (i *iterator) Close() {
i.tx.Rollback()
}

// Next return the next event
func (i *iterator) Next() (core.Event, error) {
var k, obj []byte
func (i *iterator) Next() bool {
var value []byte
if i.cursor == nil {
bucket := i.tx.Bucket([]byte(i.bucketName))
if bucket == nil {
return core.Event{}, core.ErrNoMoreEvents
return false
}
i.cursor = bucket.Cursor()
k, obj = i.cursor.Seek(itob(i.firstEventIndex))
if k == nil {
return core.Event{}, core.ErrNoMoreEvents
_, value = i.cursor.Seek(itob(i.firstEventIndex))
if value == nil {
return false
}
} else {
k, obj = i.cursor.Next()
_, value = i.cursor.Next()
}
if k == nil {
return core.Event{}, core.ErrNoMoreEvents
if value == nil {
return false
}
i.value = value
return true
}

// Next return the next event
func (i *iterator) Value() (core.Event, error) {
bEvent := boltEvent{}
err := json.Unmarshal(obj, &bEvent)
err := json.Unmarshal(i.value, &bEvent)
if err != nil {
return core.Event{}, errors.New(fmt.Sprintf("could not deserialize event, %v", err))
}
Expand Down
2 changes: 1 addition & 1 deletion eventstore/esdb/esdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (es *ESDB) Get(ctx context.Context, id string, aggregateType string, afterV
if err != nil {
if err, ok := esdb.FromError(err); !ok {
if err.Code() == esdb.ErrorCodeResourceNotFound {
return nil, core.ErrNoEvents
return core.ZeroIterator{}, nil
}
}
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion eventstore/esdb/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ require (
google.golang.org/grpc v1.57.0 // indirect
)

//replace github.com/hallgren/eventsourcing/core => ../../core
replace github.com/hallgren/eventsourcing/core => ../../core
35 changes: 15 additions & 20 deletions eventstore/esdb/iterator.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package esdb

import (
"errors"
"io"
"strings"

"github.com/EventStore/EventStore-Client-Go/v3/esdb"
Expand All @@ -11,39 +9,36 @@ import (

type iterator struct {
stream *esdb.ReadStream
event *esdb.ResolvedEvent
}

// Close closes the stream
func (i *iterator) Close() {
i.stream.Close()
}

// Next returns next event from the stream
func (i *iterator) Next() (core.Event, error) {

// Next steps to the next event in the stream
func (i *iterator) Next() bool {
eventESDB, err := i.stream.Recv()
if errors.Is(err, io.EOF) {
return core.Event{}, core.ErrNoMoreEvents
}
if err, ok := esdb.FromError(err); !ok {
if err.Code() == esdb.ErrorCodeResourceNotFound {
return core.Event{}, core.ErrNoMoreEvents
}
}
if err != nil {
return core.Event{}, err
return false
}
i.event = eventESDB
return true
}

stream := strings.Split(eventESDB.Event.StreamID, streamSeparator)
// Value returns the event from the stream
func (i *iterator) Value() (core.Event, error) {
stream := strings.Split(i.event.Event.StreamID, streamSeparator)

event := core.Event{
AggregateID: stream[1],
Version: core.Version(eventESDB.Event.EventNumber) + 1, // +1 as the eventsourcing Version starts on 1 but the esdb event version starts on 0
Version: core.Version(i.event.Event.EventNumber) + 1, // +1 as the eventsourcing Version starts on 1 but the esdb event version starts on 0
AggregateType: stream[0],
Timestamp: eventESDB.Event.CreatedDate,
Data: eventESDB.Event.Data,
Metadata: eventESDB.Event.UserMetadata,
Reason: eventESDB.Event.EventType,
Timestamp: i.event.Event.CreatedDate,
Data: i.event.Event.Data,
Metadata: i.event.Event.UserMetadata,
Reason: i.event.Event.EventType,
// Can't get the global version when using the ReadStream method
//GlobalVersion: core.Version(event.Event.Position.Commit),
}
Expand Down
16 changes: 9 additions & 7 deletions eventstore/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@ type Memory struct {
type iterator struct {
events []core.Event
position int
event core.Event
}

func (i *iterator) Next() (core.Event, error) {
func (i *iterator) Next() bool {
if len(i.events) <= i.position {
return core.Event{}, core.ErrNoMoreEvents
return false
}
event := i.events[i.position]
i.event = i.events[i.position]
i.position++
return event, nil
return true
}

func (i *iterator) Value() (core.Event, error) {
return i.event, nil
}

func (i *iterator) Close() {
Expand Down Expand Up @@ -96,9 +101,6 @@ func (e *Memory) Get(ctx context.Context, id string, aggregateType string, after
events = append(events, e)
}
}
if len(events) == 0 {
return nil, core.ErrNoEvents
}
return &iterator{events: events}, nil
}

Expand Down
2 changes: 1 addition & 1 deletion eventstore/sql/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ require (
github.com/ziutek/mymysql v1.5.4 // indirect
)

//replace github.com/hallgren/eventsourcing/core => ../../core
replace github.com/hallgren/eventsourcing/core => ../../core
16 changes: 8 additions & 8 deletions eventstore/sql/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@ type iterator struct {
rows *sql.Rows
}

// Next return the next event
func (i *iterator) Next() (core.Event, error) {
// Next return true if there are more data
func (i *iterator) Next() bool {
return i.rows.Next()
}

// Value return the an event
func (i *iterator) Value() (core.Event, error) {
var globalVersion core.Version
var version core.Version
var id, reason, typ, timestamp string
var data, metadata []byte
if !i.rows.Next() {
if err := i.rows.Err(); err != nil {
return core.Event{}, err
}
return core.Event{}, core.ErrNoMoreEvents
}

if err := i.rows.Scan(&globalVersion, &id, &version, &reason, &typ, &timestamp, &data, &metadata); err != nil {
return core.Event{}, err
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ go 1.13

require github.com/hallgren/eventsourcing/core v0.2.0

//replace github.com/hallgren/eventsourcing/core => ./core
replace github.com/hallgren/eventsourcing/core => ./core
21 changes: 8 additions & 13 deletions repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func (r *Repository) Subscribers() EventSubscribers {

// Save an aggregates events
func (r *Repository) Save(a aggregate) error {
// TODO: check that the aggregate is registered before saving it?
if !r.register.AggregateRegistered(a) {
return ErrAggregateNotRegistered
}
Expand Down Expand Up @@ -144,27 +143,19 @@ func (r *Repository) GetWithContext(ctx context.Context, id string, a aggregate)
aggregateType := aggregateType(a)
// fetch events after the current version of the aggregate that could be fetched from the snapshot store
eventIterator, err := r.eventStore.Get(ctx, id, aggregateType, core.Version(root.aggregateVersion))
if err != nil && !errors.Is(err, core.ErrNoEvents) {
if err != nil {
return err
} else if errors.Is(err, core.ErrNoEvents) && root.Version() == 0 {
// no events and not based on a snapshot
return ErrAggregateNotFound
}
defer eventIterator.Close()

for {
for eventIterator.Next() {
select {
case <-ctx.Done():
return ctx.Err()
default:
event, err := eventIterator.Next()
if err != nil && !errors.Is(err, core.ErrNoMoreEvents) {
event, err := eventIterator.Value()
if err != nil {
return err
} else if errors.Is(err, core.ErrNoMoreEvents) && root.Version() == 0 {
// no events and no snapshot (some eventstore will not return the error ErrNoEvent on Get())
return ErrAggregateNotFound
} else if errors.Is(err, core.ErrNoMoreEvents) {
return nil
}
// apply the event to the aggregate
f, found := r.register.EventRegistered(event)
Expand All @@ -186,6 +177,10 @@ func (r *Repository) GetWithContext(ctx context.Context, id string, a aggregate)
root.BuildFromHistory(a, []Event{e})
}
}
if a.Root().Version() == 0 {
return ErrAggregateNotFound
}
return nil
}

// Get fetches the aggregates event and build up the aggregate.
Expand Down

0 comments on commit fc04d13

Please sign in to comment.