diff --git a/README.md b/README.md index 6ccc65bf..58ef7a1b 100644 --- a/README.md +++ b/README.md @@ -366,4 +366,172 @@ func (s *snapshot) DeserializeSnapshot(m eventsourcing.DeserializeFunc, b []byte s.unexported = snap.UnExported return nil } +``` + +## Projections + +Projections is a way to build read-models based on events. A read-model is way to expose data from events in a different form. Where the form is optimized for read-only queries. + +If you want more background on projections check out Derek Comartin projections article [Projections in Event Sourcing: Build ANY model you want!](https://codeopinion.com/projections-in-event-sourcing-build-any-model-you-want/) or Martin Fowler's [CQRS](https://martinfowler.com/bliki/CQRS.html). + +### Projection Handler + +The Projection handler is the central part where projections are created. It's available from the event repository by the `eventrepo.Projections` property but can also be created standalone. + +```go +// access via the event repository +eventRepo := eventsourcing.NewEventRepository(eventstore) +ph := eventRepo.Projections + +// standalone without the event repository +ph := eventsourcing.NewProjectionHandler(register, encoder) +``` + +The projection handler include the event register and a encoder to deserialize events from an event store to application event. + +### Projection + +A _projection_ is created from the projection handler via the `Projection()` method. The method takes a `fetchFunc` and a `callbackFunc` and returns a pointer to the projection. + +```go +p := ph.Projection(f fetchFunc, c callbackFunc) +``` + +The fetchFunc must return `(core.Iterator, error)`, i.e the same signature that event stores return when they return events. + +```go +type fetchFunc func() (core.Iterator, error) +``` + +The `callbackFunc` is called for every iterated event inside the projection. The event is typed and can be handled in the same way as the aggregate `Transition()` method. + +```go +type callbackFunc func(e eventsourcing.Event) error +``` + +Example: Creates a projection that fetch all events from an event store and handle them in the callbackF. + +```go +p := eventRepo.Projections.Projection(es.All(0, 1), func(event eventsourcing.Event) error { + switch e := event.Data().(type) { + case *Born: + // handle the event + } + return nil +}) +``` + +### Projection execution + +A projection can be started in three different ways. + +#### RunOnce + +RunOnce fetch events from the event store one time. It returns true if there were events to iterate otherwise false. + +```go +RunOnce() (bool, ProjectionResult) +``` + +#### RunToEnd + +RunToEnd fetch events from the event store until it reaches the end of the event stream. A context is passed in making it possible to cancel the projections from the outside. + +```go +RunToEnd(ctx context.Context) ProjectionResult +``` + +#### Run + +Run will run forever until canceled from the outside. When it hits the end of the event stream it will start a timer and sleep the time set in the projection property `Pace`. + +```go +Run(ctx context.Context) ProjectionResult +``` + +All run methods return a ProjectionResult. + +```go +type ProjectionResult struct { + Error error + ProjectionName string + LastHandledEvent Event +} +``` + +* **Error** Is set if the projection returned an error +* **ProjectionName** Is the name of the projection +* **LastHandledEvent** The last successfully handled event (can be useful during debugging) + +### Projection properties + +A projection have a set of properties that can affect it's behaivior. + +* **Pace** - Is used in the Run method to set how often the projection will poll the event store for new events. +* **Strict** - Default true and it will trigger an error if a fetched event is not registered in the event `Register`. This force all events to be handled by the callbackFunc. +* **Name** - The name of the projection. Can be useful when debugging multiple running projection. The default name is the index it was created from the projection handler. + +### Run multiple projections + +#### Group + +A set of projections can run concurrently in a group. + +```go +g := ph.Group(p1, p2, p3) +``` + +A group is started with `g.Start()` where each projection will run in a separate go routine. Errors from a projection can be retrieved from a error channel `g.ErrChan`. + +The `g.Stop()` method is used to halt all projections in the group and it returns when all projections has stopped. + +```go +// create three projections +p1 := ph.Projection(es.All(0, 1), callbackF) +p2 := ph.Projection(es.All(0, 1), callbackF) +p3 := ph.Projection(es.All(0, 1), callbackF) + +// create a group containing the projections +g := ph.Group(p1, p2, p3) + +// Start runs all projections concurrently +g.Start() + +// Stop terminate all projections and wait for them to return +defer g.Stop() + +// handling error in projection or termination from outside +select { + case result := <-g.ErrChan: + // handle the result that will have an error in the result.Error + case <-doneChan: + // stop signal from the out side + return +} +``` + +#### Race + +Compared to a group the race is a one shot operation. Instead of fetching events continuously it's used to iterate and process all existing events and then return. + +The `Race()` method starts the projections and run them to the end of there event streams. When all projections are finished the method return. + +```go +Race(cancelOnError bool, projections ...*Projection) ([]ProjectionResult, error) +``` + +If `cancelOnError` is set to true the method will halt all projections and return if any projection is returning an error. + +The returned `[]ProjectionResult` is a collection of all projection results. + +Race example: + +```go +// create two projections +ph := eventsourcing.NewProjectionHandler(register, eventsourcing.EncoderJSON{}) +p1 := ph.Projection(es.All(0, 1), callbackF) +p2 := ph.Projection(es.All(0, 1), callbackF) + +// true make the race return on error in any projection +result, err := p.Race(true, r1, r2) ``` \ No newline at end of file diff --git a/eventrepository.go b/eventrepository.go index 1cdda8ce..f3bce4be 100644 --- a/eventrepository.go +++ b/eventrepository.go @@ -50,22 +50,30 @@ type EventRepository struct { // register that convert the Data []byte to correct type register *Register // encoder to serialize / deserialize events - encoder encoder + encoder encoder + Projections *ProjectionHandler } // NewRepository factory function func NewEventRepository(eventStore core.EventStore) *EventRepository { + register := NewRegister() + encoder := EncoderJSON{} + return &EventRepository{ eventStore: eventStore, eventStream: NewEventStream(), - register: NewRegister(), - encoder: EncoderJSON{}, // Default to JSON encoder + register: register, + encoder: encoder, // Default to JSON encoder + Projections: NewProjectionHandler(register, encoder), } } // Encoder change the default JSON encoder that serializer/deserializer events func (er *EventRepository) Encoder(e encoder) { + // set encoder on event repository er.encoder = e + // set encoder in projection handler + er.Projections.Encoder = e } func (er *EventRepository) Register(a aggregate) { diff --git a/eventrepository_test.go b/eventrepository_test.go index ba4e2c0d..48b06406 100644 --- a/eventrepository_test.go +++ b/eventrepository_test.go @@ -349,3 +349,33 @@ func (person *PersonNoRegisterEvents) Transition(event eventsourcing.Event) { // Register bind the events to the repository when the aggregate is registered. func (person *PersonNoRegisterEvents) Register(f eventsourcing.RegisterFunc) { } + +func TestProjectionFromRepo(t *testing.T) { + es := memory.Create() + repo := eventsourcing.NewEventRepository(es) + repo.Register(&Person{}) + + person, err := CreatePerson("kalle") + if err != nil { + t.Fatal(err) + } + err = repo.Save(person) + if err != nil { + t.Fatal(err) + } + + var projectedName string + + p := repo.Projections.Projection(es.All(0, 1), func(event eventsourcing.Event) error { + switch e := event.Data().(type) { + case *Born: + projectedName = e.Name + } + return nil + }) + p.RunOnce() + + if projectedName != "kalle" { + t.Fatalf("expected projectedName to be kalle was %q", projectedName) + } +} diff --git a/eventstore/bbolt/bbolt.go b/eventstore/bbolt/bbolt.go index 4035ee6e..a7a7ac75 100644 --- a/eventstore/bbolt/bbolt.go +++ b/eventstore/bbolt/bbolt.go @@ -170,8 +170,8 @@ func (e *BBolt) Get(ctx context.Context, id string, aggregateType string, afterV return &iterator{tx: tx, cursor: cursor, startPosition: position(afterVersion)}, nil } -// GlobalEvents return count events in order globally from the start posistion -func (e *BBolt) GlobalEvents(start uint64) (core.Iterator, error) { +// All iterate over event in GlobalEvents order +func (e *BBolt) All(start uint64) (core.Iterator, error) { tx, err := e.db.Begin(false) if err != nil { return nil, err diff --git a/eventstore/memory/iterator.go b/eventstore/memory/iterator.go new file mode 100644 index 00000000..3ba52f2c --- /dev/null +++ b/eventstore/memory/iterator.go @@ -0,0 +1,27 @@ +package memory + +import "github.com/hallgren/eventsourcing/core" + +type iterator struct { + events []core.Event + position int + event core.Event +} + +func (i *iterator) Next() bool { + if len(i.events) <= i.position { + return false + } + i.event = i.events[i.position] + i.position++ + return true +} + +func (i *iterator) Value() (core.Event, error) { + return i.event, nil +} + +func (i *iterator) Close() { + i.events = nil + i.position = 0 +} diff --git a/eventstore/memory/memory.go b/eventstore/memory/memory.go index 85f8c4f4..000a2c0b 100644 --- a/eventstore/memory/memory.go +++ b/eventstore/memory/memory.go @@ -14,30 +14,6 @@ type Memory struct { lock sync.Mutex } -type iterator struct { - events []core.Event - position int - event core.Event -} - -func (i *iterator) Next() bool { - if len(i.events) <= i.position { - return false - } - i.event = i.events[i.position] - i.position++ - return true -} - -func (i *iterator) Value() (core.Event, error) { - return i.event, nil -} - -func (i *iterator) Close() { - i.events = nil - i.position = 0 -} - // Create in memory event store func Create() *Memory { return &Memory{ @@ -104,16 +80,24 @@ func (e *Memory) Get(ctx context.Context, id string, aggregateType string, after return &iterator{events: events}, nil } -// GlobalEvents will return count events in order globally from the start posistion -func (e *Memory) GlobalEvents(start, count uint64) ([]core.Event, error) { - var events []core.Event +// Close does nothing +func (e *Memory) Close() {} + +// aggregateKey generates a key to store events against from aggregateType and aggregateID +func aggregateKey(aggregateType, aggregateID string) string { + return aggregateType + "_" + aggregateID +} + +// globalEvents returns count events in order globally from the start position +func (e *Memory) globalEvents(start core.Version, count uint64) ([]core.Event, error) { + events := make([]core.Event, 0, count) // make sure its thread safe e.lock.Lock() defer e.lock.Unlock() for _, e := range e.eventsInOrder { // find start position and append until counter is 0 - if uint64(e.GlobalVersion) >= start { + if e.GlobalVersion >= start { events = append(events, e) count-- if count == 0 { @@ -124,10 +108,21 @@ func (e *Memory) GlobalEvents(start, count uint64) ([]core.Event, error) { return events, nil } -// Close does nothing -func (e *Memory) Close() {} +// All iterate over all events in GlobalEvents order +func (m *Memory) All(start core.Version, count uint64) func() (core.Iterator, error) { + return func() (core.Iterator, error) { + events, err := m.globalEvents(start, count) + if err != nil { + return nil, err + } -// aggregateKey generate a aggregate key to store events against from aggregateType and aggregateID -func aggregateKey(aggregateType, aggregateID string) string { - return aggregateType + "_" + aggregateID + // no events to fetch + if len(events) == 0 { + return core.ZeroIterator{}, nil + } + + // next time the function is called it will start from the last fetched event +1 + start = events[len(events)-1].GlobalVersion + 1 + return &iterator{events: events}, nil + } } diff --git a/eventstore/sql/sql.go b/eventstore/sql/sql.go index b5ffb960..b1f2aade 100644 --- a/eventstore/sql/sql.go +++ b/eventstore/sql/sql.go @@ -88,46 +88,12 @@ func (s *SQL) Get(ctx context.Context, id string, aggregateType string, afterVer return &iterator{rows: rows}, nil } -// GlobalEvents return count events in order globally from the start posistion -func (s *SQL) GlobalEvents(start, count uint64) ([]core.Event, error) { +// All iterate over all event in GlobalEvents order +func (s *SQL) All(start core.Version, count uint64) (core.Iterator, error) { selectStm := `Select seq, id, version, reason, type, timestamp, data, metadata from events where seq >= ? order by seq asc LIMIT ?` rows, err := s.db.Query(selectStm, start, count) if err != nil { return nil, err } - defer rows.Close() - return s.eventsFromRows(rows) -} - -func (s *SQL) eventsFromRows(rows *sql.Rows) ([]core.Event, error) { - var events []core.Event - for rows.Next() { - var globalVersion core.Version - var version core.Version - var id, reason, typ, timestamp string - var data, metadata []byte - if err := rows.Scan(&globalVersion, &id, &version, &reason, &typ, ×tamp, &data, &metadata); err != nil { - return nil, err - } - - t, err := time.Parse(time.RFC3339, timestamp) - if err != nil { - return nil, err - } - - events = append(events, core.Event{ - AggregateID: id, - Version: version, - GlobalVersion: globalVersion, - AggregateType: typ, - Timestamp: t, - Data: data, - Metadata: metadata, - Reason: reason, - }) - } - if err := rows.Err(); err != nil { - return nil, err - } - return events, nil + return &iterator{rows: rows}, nil } diff --git a/projections.go b/projections.go new file mode 100644 index 00000000..226aa1b0 --- /dev/null +++ b/projections.go @@ -0,0 +1,243 @@ +package eventsourcing + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/hallgren/eventsourcing/core" +) + +type fetchFunc func() (core.Iterator, error) +type callbackFunc func(e Event) error + +type ProjectionHandler struct { + register *Register + Encoder encoder + count int +} + +func NewProjectionHandler(register *Register, encoder encoder) *ProjectionHandler { + return &ProjectionHandler{ + register: register, + Encoder: encoder, + } +} + +type Projection struct { + fetchF fetchFunc + callbackF callbackFunc + handler *ProjectionHandler + Pace time.Duration // Pace is used when a projection is running and it reaches the end of the event stream + Strict bool // Strict indicate if the projection should return error if the event it fetches is not found in the register + Name string +} + +// Group runs projections concurrently +type Group struct { + handler *ProjectionHandler + projections []*Projection + cancelF context.CancelFunc + wg sync.WaitGroup + ErrChan chan ProjectionResult +} + +// ProjectionResult is the return type for a Group and Race +type ProjectionResult struct { + Error error + Name string + LastHandledEvent Event +} + +// Projection creates a projection that will run down an event stream +func (ph *ProjectionHandler) Projection(fetchF fetchFunc, callbackF callbackFunc) *Projection { + projection := Projection{ + fetchF: fetchF, + callbackF: callbackF, + handler: ph, + Pace: time.Second * 10, // Default pace 10 seconds + Strict: true, // Default strict is active + Name: fmt.Sprintf("%d", ph.count), // Default the name to it's creation index + } + ph.count++ + return &projection +} + +// Run runs the projection forever until the context is cancelled. When there are no more events to consume it +// sleeps the set pace before it runs again. +func (p *Projection) Run(ctx context.Context) ProjectionResult { + var result ProjectionResult + timer := time.NewTimer(0) + for { + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return ProjectionResult{Error: ctx.Err(), Name: result.Name, LastHandledEvent: result.LastHandledEvent} + case <-timer.C: + result = p.RunToEnd(ctx) + if result.Error != nil { + return result + } + } + timer.Reset(p.Pace) + } +} + +// RunToEnd runs until the projection reaches the end of the event stream +func (p *Projection) RunToEnd(ctx context.Context) ProjectionResult { + var result ProjectionResult + var lastHandledEvent Event + + for { + select { + case <-ctx.Done(): + return ProjectionResult{Error: ctx.Err(), Name: result.Name, LastHandledEvent: result.LastHandledEvent} + default: + ran, result := p.RunOnce() + // if the first event returned error or if it did not run at all + if result.LastHandledEvent.GlobalVersion() == 0 { + result.LastHandledEvent = lastHandledEvent + } + if result.Error != nil { + return result + } + // hit the end of the event stream + if !ran { + return result + } + lastHandledEvent = result.LastHandledEvent + } + } +} + +// RunOnce runs the fetch method one time +func (p *Projection) RunOnce() (bool, ProjectionResult) { + // ran indicate if there were events to fetch + var ran bool + var lastHandledEvent Event + + iterator, err := p.fetchF() + if err != nil { + return false, ProjectionResult{Error: err, Name: p.Name, LastHandledEvent: lastHandledEvent} + } + defer iterator.Close() + + for iterator.Next() { + ran = true + event, err := iterator.Value() + if err != nil { + return false, ProjectionResult{Error: err, Name: p.Name, LastHandledEvent: lastHandledEvent} + } + + // TODO: is only registered events of interest? + f, found := p.handler.register.EventRegistered(event) + if !found { + if p.Strict { + err = fmt.Errorf("event not registered aggregate type: %s, reason: %s, global version: %d, %w", event.AggregateType, event.Reason, event.GlobalVersion, ErrEventNotRegistered) + return false, ProjectionResult{Error: err, Name: p.Name, LastHandledEvent: lastHandledEvent} + } + continue + } + + data := f() + err = p.handler.Encoder.Deserialize(event.Data, &data) + if err != nil { + return false, ProjectionResult{Error: err, Name: p.Name, LastHandledEvent: lastHandledEvent} + } + + metadata := make(map[string]interface{}) + if event.Metadata != nil { + err = p.handler.Encoder.Deserialize(event.Metadata, &metadata) + if err != nil { + return false, ProjectionResult{Error: err, Name: p.Name, LastHandledEvent: lastHandledEvent} + } + } + e := NewEvent(event, data, metadata) + + err = p.callbackF(e) + if err != nil { + return false, ProjectionResult{Error: err, Name: p.Name, LastHandledEvent: lastHandledEvent} + } + // keep a reference to the last successfully handled event + lastHandledEvent = e + } + return ran, ProjectionResult{Error: nil, Name: p.Name, LastHandledEvent: lastHandledEvent} +} + +// Group runs a group of projections concurrently +func (ph *ProjectionHandler) Group(projections ...*Projection) *Group { + return &Group{ + handler: ph, + projections: projections, + cancelF: func() {}, + ErrChan: make(chan ProjectionResult), + } +} + +// Start starts all projectinos in the group, an error channel i created on the group to notify +// if a result containing an error is returned from a projection +func (g *Group) Start() { + ctx, cancel := context.WithCancel(context.Background()) + g.cancelF = cancel + + g.wg.Add(len(g.projections)) + for _, projection := range g.projections { + go func(p *Projection) { + defer g.wg.Done() + result := p.Run(ctx) + if !errors.Is(result.Error, context.Canceled) { + g.ErrChan <- result + } + }(projection) + } +} + +// Stop terminate all projections in the group +func (g *Group) Stop() { + g.cancelF() + + // return when all projections has stopped + g.wg.Wait() + + // close the error channel + close(g.ErrChan) +} + +// Race runs the projections to the end of the events streams. +// Can be used on a stale event stream with no more events coming in or when you want to know when all projections are done. +func (p *ProjectionHandler) Race(cancelOnError bool, projections ...*Projection) ([]ProjectionResult, error) { + var lock sync.Mutex + var causingErr error + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wg := sync.WaitGroup{} + wg.Add(len(projections)) + + results := make([]ProjectionResult, len(projections)) + for i, projection := range projections { + go func(pr *Projection, index int) { + defer wg.Done() + result := pr.RunToEnd(ctx) + if result.Error != nil { + if !errors.Is(result.Error, context.Canceled) && cancelOnError { + cancel() + + lock.Lock() + causingErr = result.Error + lock.Unlock() + } + } + lock.Lock() + results[index] = result + lock.Unlock() + }(projection, i) + } + wg.Wait() + return results, causingErr +} diff --git a/projections_test.go b/projections_test.go new file mode 100644 index 00000000..ba803bcc --- /dev/null +++ b/projections_test.go @@ -0,0 +1,331 @@ +package eventsourcing_test + +import ( + "context" + "encoding/json" + "errors" + "testing" + "time" + + "github.com/hallgren/eventsourcing" + "github.com/hallgren/eventsourcing/core" + "github.com/hallgren/eventsourcing/eventstore/memory" +) + +func createPersonEvent(es *memory.Memory, name string, age int) error { + person, err := CreatePerson(name) + if err != nil { + return err + } + + for i := 0; i < age; i++ { + person.GrowOlder() + } + + events := make([]core.Event, 0) + for _, e := range person.Events() { + data, err := json.Marshal(e.Data()) + if err != nil { + return err + } + + events = append(events, core.Event{ + AggregateID: e.AggregateID(), + Reason: e.Reason(), + AggregateType: e.AggregateType(), + Version: core.Version(e.Version()), + GlobalVersion: core.Version(e.GlobalVersion()), + Timestamp: e.Timestamp(), + Data: data, + }) + } + return es.Save(events) +} + +func TestRunOnce(t *testing.T) { + // setup + es := memory.Create() + register := eventsourcing.NewRegister() + register.Register(&Person{}) + + projectedName := "" + + err := createPersonEvent(es, "kalle", 0) + if err != nil { + t.Fatal(err) + } + + err = createPersonEvent(es, "anka", 0) + if err != nil { + t.Fatal(err) + } + + // run projection one event at each run + p := eventsourcing.NewProjectionHandler(register, eventsourcing.EncoderJSON{}) + proj := p.Projection(es.All(0, 1), func(event eventsourcing.Event) error { + switch e := event.Data().(type) { + case *Born: + projectedName = e.Name + } + return nil + }) + + // should set projectedName to kalle + work, result := proj.RunOnce() + if result.Error != nil { + t.Fatal(err) + } + + if !work { + t.Fatal("there was no work to do") + } + if projectedName != "kalle" { + t.Fatalf("expected %q was %q", "kalle", projectedName) + } + + // should set the projected name to anka + work, result = proj.RunOnce() + if result.Error != nil { + t.Fatal(err) + } + + if !work { + t.Fatal("there was no work to do") + } + if projectedName != "anka" { + t.Fatalf("expected %q was %q", "anka", projectedName) + } +} + +func TestRun(t *testing.T) { + // setup + es := memory.Create() + register := eventsourcing.NewRegister() + register.Register(&Person{}) + + projectedName := "" + sourceName := "kalle" + + err := createPersonEvent(es, sourceName, 1) + if err != nil { + t.Fatal(err) + } + + // run projection + p := eventsourcing.NewProjectionHandler(register, eventsourcing.EncoderJSON{}) + proj := p.Projection(es.All(0, 1), func(event eventsourcing.Event) error { + switch e := event.Data().(type) { + case *Born: + projectedName = e.Name + } + return nil + }) + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) + defer cancel() + + // will run once then sleep 10 seconds + result := proj.Run(ctx) + if !errors.Is(result.Error, context.DeadlineExceeded) { + t.Fatal(err) + } + + if projectedName != sourceName { + t.Fatalf("expected %q was %q", sourceName, projectedName) + } +} + +func TestCloseEmptyGroup(t *testing.T) { + p := eventsourcing.NewProjectionHandler(eventsourcing.NewRegister(), eventsourcing.EncoderJSON{}) + g := p.Group() + g.Stop() +} + +func TestStartMultipleProjections(t *testing.T) { + // setup + es := memory.Create() + register := eventsourcing.NewRegister() + + // callback that handles the events + callbackF := func(event eventsourcing.Event) error { + return nil + } + + p := eventsourcing.NewProjectionHandler(register, eventsourcing.EncoderJSON{}) + r1 := p.Projection(es.All(0, 1), callbackF) + r2 := p.Projection(es.All(0, 1), callbackF) + r3 := p.Projection(es.All(0, 1), callbackF) + + g := p.Group(r1, r2, r3) + g.Start() + g.Stop() +} + +func TestErrorFromCallback(t *testing.T) { + // setup + es := memory.Create() + register := eventsourcing.NewRegister() + register.Register(&Person{}) + + err := createPersonEvent(es, "kalle", 1) + if err != nil { + t.Fatal(err) + } + + // define application error that can be returned from the callback function + var ErrApplication = errors.New("application error") + + // callback that handles the events + callbackF := func(event eventsourcing.Event) error { + return ErrApplication + } + + p := eventsourcing.NewProjectionHandler(register, eventsourcing.EncoderJSON{}) + r := p.Projection(es.All(0, 1), callbackF) + + g := p.Group(r) + + g.Start() + defer g.Stop() + + var result eventsourcing.ProjectionResult + + select { + case result = <-g.ErrChan: + case <-time.After(time.Second): + t.Fatal("test timed out") + } + + if !errors.Is(result.Error, ErrApplication) { + if err != nil { + t.Fatalf("expected application error but got %s", err.Error()) + } + t.Fatal("got none error expected ErrApplication") + } +} + +func TestStrict(t *testing.T) { + // setup + es := memory.Create() + register := eventsourcing.NewRegister() + + // We do not register the Person aggregate with the Born event attached + err := createPersonEvent(es, "kalle", 1) + if err != nil { + t.Fatal(err) + } + + p := eventsourcing.NewProjectionHandler(register, eventsourcing.EncoderJSON{}) + proj := p.Projection(es.All(0, 1), func(event eventsourcing.Event) error { + return nil + }) + + _, result := proj.RunOnce() + if !errors.Is(result.Error, eventsourcing.ErrEventNotRegistered) { + t.Fatalf("expected ErrEventNotRegistered got %q", err.Error()) + } +} + +func TestRace(t *testing.T) { + // setup + es := memory.Create() + register := eventsourcing.NewRegister() + register.Register(&Person{}) + + err := createPersonEvent(es, "kalle", 50) + if err != nil { + t.Fatal(err) + } + + // callback that handles the events + callbackF := func(event eventsourcing.Event) error { + time.Sleep(time.Millisecond * 2) + return nil + } + + applicationErr := errors.New("an error") + + p := eventsourcing.NewProjectionHandler(register, eventsourcing.EncoderJSON{}) + r1 := p.Projection(es.All(0, 1), callbackF) + r2 := p.Projection(es.All(0, 1), func(e eventsourcing.Event) error { + time.Sleep(time.Millisecond) + if e.GlobalVersion() == 31 { + return applicationErr + } + return nil + }) + + result, err := p.Race(true, r1, r2) + + // causing err should be applicationErr + if !errors.Is(err, applicationErr) { + t.Fatalf("expected causing error to be applicationErr got %v", err) + } + + // projection 0 should have a context.Canceled error + if !errors.Is(result[0].Error, context.Canceled) { + t.Fatalf("expected projection %q to have err 'context.Canceled' got %v", result[0].Name, result[0].Error) + } + + // projection 1 should have a applicationErr error + if !errors.Is(result[1].Error, applicationErr) { + t.Fatalf("expected projection %q to have err 'applicationErr' got %v", result[1].Name, result[1].Error) + } + + // projection 1 should have halted on event with GlobalVersion 30 + if result[1].LastHandledEvent.GlobalVersion() != 30 { + t.Fatalf("expected projection 1 Event.GlobalVersion() to be 30 but was %d", result[1].LastHandledEvent.GlobalVersion()) + } +} + +func TestKeepStartPosition(t *testing.T) { + // setup + es := memory.Create() + register := eventsourcing.NewRegister() + register.Register(&Person{}) + + err := createPersonEvent(es, "kalle", 5) + if err != nil { + t.Fatal(err) + } + + start := core.Version(0) + counter := 0 + + // callback that handles the events + callbackF := func(event eventsourcing.Event) error { + switch event.Data().(type) { + case *AgedOneYear: + counter++ + } + start = core.Version(event.GlobalVersion() + 1) + return nil + } + + p := eventsourcing.NewProjectionHandler(register, eventsourcing.EncoderJSON{}) + r := p.Projection(es.All(0, 1), callbackF) + + _, err = p.Race(true, r) + if err != nil { + t.Fatal(err) + } + + err = createPersonEvent(es, "anka", 5) + if err != nil { + t.Fatal(err) + } + + _, err = p.Race(true, r) + if err != nil { + t.Fatal(err) + } + + // Born 2 + AgedOnYear 5 + 5 = 12 + Next Event 1 = 13 + if start != 13 { + t.Fatalf("expected start to be 13 was %d", start) + } + + if counter != 10 { + t.Fatalf("expected counter to be 10 was %d", counter) + } +}