Skip to content

Commit

Permalink
Merge pull request #104 from hallgren/projections
Browse files Browse the repository at this point in the history
Projections
  • Loading branch information
hallgren authored Mar 4, 2024
2 parents eaf9530 + 690e229 commit ac99b92
Show file tree
Hide file tree
Showing 9 changed files with 843 additions and 75 deletions.
168 changes: 168 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -366,4 +366,172 @@ func (s *snapshot) DeserializeSnapshot(m eventsourcing.DeserializeFunc, b []byte
s.unexported = snap.UnExported
return nil
}
```

## Projections

Projections is a way to build read-models based on events. A read-model is way to expose data from events in a different form. Where the form is optimized for read-only queries.

If you want more background on projections check out Derek Comartin projections article [Projections in Event Sourcing: Build ANY model you want!](https://codeopinion.com/projections-in-event-sourcing-build-any-model-you-want/) or Martin Fowler's [CQRS](https://martinfowler.com/bliki/CQRS.html).

### Projection Handler

The Projection handler is the central part where projections are created. It's available from the event repository by the `eventrepo.Projections` property but can also be created standalone.

```go
// access via the event repository
eventRepo := eventsourcing.NewEventRepository(eventstore)
ph := eventRepo.Projections

// standalone without the event repository
ph := eventsourcing.NewProjectionHandler(register, encoder)
```

The projection handler include the event register and a encoder to deserialize events from an event store to application event.

### Projection

A _projection_ is created from the projection handler via the `Projection()` method. The method takes a `fetchFunc` and a `callbackFunc` and returns a pointer to the projection.

```go
p := ph.Projection(f fetchFunc, c callbackFunc)
```

The fetchFunc must return `(core.Iterator, error)`, i.e the same signature that event stores return when they return events.

```go
type fetchFunc func() (core.Iterator, error)
```

The `callbackFunc` is called for every iterated event inside the projection. The event is typed and can be handled in the same way as the aggregate `Transition()` method.

```go
type callbackFunc func(e eventsourcing.Event) error
```

Example: Creates a projection that fetch all events from an event store and handle them in the callbackF.

```go
p := eventRepo.Projections.Projection(es.All(0, 1), func(event eventsourcing.Event) error {
switch e := event.Data().(type) {
case *Born:
// handle the event
}
return nil
})
```

### Projection execution

A projection can be started in three different ways.

#### RunOnce

RunOnce fetch events from the event store one time. It returns true if there were events to iterate otherwise false.

```go
RunOnce() (bool, ProjectionResult)
```

#### RunToEnd

RunToEnd fetch events from the event store until it reaches the end of the event stream. A context is passed in making it possible to cancel the projections from the outside.

```go
RunToEnd(ctx context.Context) ProjectionResult
```

#### Run

Run will run forever until canceled from the outside. When it hits the end of the event stream it will start a timer and sleep the time set in the projection property `Pace`.

```go
Run(ctx context.Context) ProjectionResult
```

All run methods return a ProjectionResult.

```go
type ProjectionResult struct {
Error error
ProjectionName string
LastHandledEvent Event
}
```

* **Error** Is set if the projection returned an error
* **ProjectionName** Is the name of the projection
* **LastHandledEvent** The last successfully handled event (can be useful during debugging)

### Projection properties

A projection have a set of properties that can affect it's behaivior.

* **Pace** - Is used in the Run method to set how often the projection will poll the event store for new events.
* **Strict** - Default true and it will trigger an error if a fetched event is not registered in the event `Register`. This force all events to be handled by the callbackFunc.
* **Name** - The name of the projection. Can be useful when debugging multiple running projection. The default name is the index it was created from the projection handler.

### Run multiple projections

#### Group

A set of projections can run concurrently in a group.

```go
g := ph.Group(p1, p2, p3)
```

A group is started with `g.Start()` where each projection will run in a separate go routine. Errors from a projection can be retrieved from a error channel `g.ErrChan`.

The `g.Stop()` method is used to halt all projections in the group and it returns when all projections has stopped.

```go
// create three projections
p1 := ph.Projection(es.All(0, 1), callbackF)
p2 := ph.Projection(es.All(0, 1), callbackF)
p3 := ph.Projection(es.All(0, 1), callbackF)

// create a group containing the projections
g := ph.Group(p1, p2, p3)

// Start runs all projections concurrently
g.Start()

// Stop terminate all projections and wait for them to return
defer g.Stop()

// handling error in projection or termination from outside
select {
case result := <-g.ErrChan:
// handle the result that will have an error in the result.Error
case <-doneChan:
// stop signal from the out side
return
}
```

#### Race

Compared to a group the race is a one shot operation. Instead of fetching events continuously it's used to iterate and process all existing events and then return.

The `Race()` method starts the projections and run them to the end of there event streams. When all projections are finished the method return.

```go
Race(cancelOnError bool, projections ...*Projection) ([]ProjectionResult, error)
```

If `cancelOnError` is set to true the method will halt all projections and return if any projection is returning an error.

The returned `[]ProjectionResult` is a collection of all projection results.

Race example:

```go
// create two projections
ph := eventsourcing.NewProjectionHandler(register, eventsourcing.EncoderJSON{})
p1 := ph.Projection(es.All(0, 1), callbackF)
p2 := ph.Projection(es.All(0, 1), callbackF)

// true make the race return on error in any projection
result, err := p.Race(true, r1, r2)
```
14 changes: 11 additions & 3 deletions eventrepository.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,30 @@ type EventRepository struct {
// register that convert the Data []byte to correct type
register *Register
// encoder to serialize / deserialize events
encoder encoder
encoder encoder
Projections *ProjectionHandler
}

// NewRepository factory function
func NewEventRepository(eventStore core.EventStore) *EventRepository {
register := NewRegister()
encoder := EncoderJSON{}

return &EventRepository{
eventStore: eventStore,
eventStream: NewEventStream(),
register: NewRegister(),
encoder: EncoderJSON{}, // Default to JSON encoder
register: register,
encoder: encoder, // Default to JSON encoder
Projections: NewProjectionHandler(register, encoder),
}
}

// Encoder change the default JSON encoder that serializer/deserializer events
func (er *EventRepository) Encoder(e encoder) {
// set encoder on event repository
er.encoder = e
// set encoder in projection handler
er.Projections.Encoder = e
}

func (er *EventRepository) Register(a aggregate) {
Expand Down
30 changes: 30 additions & 0 deletions eventrepository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,3 +349,33 @@ func (person *PersonNoRegisterEvents) Transition(event eventsourcing.Event) {
// Register bind the events to the repository when the aggregate is registered.
func (person *PersonNoRegisterEvents) Register(f eventsourcing.RegisterFunc) {
}

func TestProjectionFromRepo(t *testing.T) {
es := memory.Create()
repo := eventsourcing.NewEventRepository(es)
repo.Register(&Person{})

person, err := CreatePerson("kalle")
if err != nil {
t.Fatal(err)
}
err = repo.Save(person)
if err != nil {
t.Fatal(err)
}

var projectedName string

p := repo.Projections.Projection(es.All(0, 1), func(event eventsourcing.Event) error {
switch e := event.Data().(type) {
case *Born:
projectedName = e.Name
}
return nil
})
p.RunOnce()

if projectedName != "kalle" {
t.Fatalf("expected projectedName to be kalle was %q", projectedName)
}
}
4 changes: 2 additions & 2 deletions eventstore/bbolt/bbolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ func (e *BBolt) Get(ctx context.Context, id string, aggregateType string, afterV
return &iterator{tx: tx, cursor: cursor, startPosition: position(afterVersion)}, nil
}

// GlobalEvents return count events in order globally from the start posistion
func (e *BBolt) GlobalEvents(start uint64) (core.Iterator, error) {
// All iterate over event in GlobalEvents order
func (e *BBolt) All(start uint64) (core.Iterator, error) {
tx, err := e.db.Begin(false)
if err != nil {
return nil, err
Expand Down
27 changes: 27 additions & 0 deletions eventstore/memory/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package memory

import "github.com/hallgren/eventsourcing/core"

type iterator struct {
events []core.Event
position int
event core.Event
}

func (i *iterator) Next() bool {
if len(i.events) <= i.position {
return false
}
i.event = i.events[i.position]
i.position++
return true
}

func (i *iterator) Value() (core.Event, error) {
return i.event, nil
}

func (i *iterator) Close() {
i.events = nil
i.position = 0
}
61 changes: 28 additions & 33 deletions eventstore/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,6 @@ type Memory struct {
lock sync.Mutex
}

type iterator struct {
events []core.Event
position int
event core.Event
}

func (i *iterator) Next() bool {
if len(i.events) <= i.position {
return false
}
i.event = i.events[i.position]
i.position++
return true
}

func (i *iterator) Value() (core.Event, error) {
return i.event, nil
}

func (i *iterator) Close() {
i.events = nil
i.position = 0
}

// Create in memory event store
func Create() *Memory {
return &Memory{
Expand Down Expand Up @@ -104,16 +80,24 @@ func (e *Memory) Get(ctx context.Context, id string, aggregateType string, after
return &iterator{events: events}, nil
}

// GlobalEvents will return count events in order globally from the start posistion
func (e *Memory) GlobalEvents(start, count uint64) ([]core.Event, error) {
var events []core.Event
// Close does nothing
func (e *Memory) Close() {}

// aggregateKey generates a key to store events against from aggregateType and aggregateID
func aggregateKey(aggregateType, aggregateID string) string {
return aggregateType + "_" + aggregateID
}

// globalEvents returns count events in order globally from the start position
func (e *Memory) globalEvents(start core.Version, count uint64) ([]core.Event, error) {
events := make([]core.Event, 0, count)
// make sure its thread safe
e.lock.Lock()
defer e.lock.Unlock()

for _, e := range e.eventsInOrder {
// find start position and append until counter is 0
if uint64(e.GlobalVersion) >= start {
if e.GlobalVersion >= start {
events = append(events, e)
count--
if count == 0 {
Expand All @@ -124,10 +108,21 @@ func (e *Memory) GlobalEvents(start, count uint64) ([]core.Event, error) {
return events, nil
}

// Close does nothing
func (e *Memory) Close() {}
// All iterate over all events in GlobalEvents order
func (m *Memory) All(start core.Version, count uint64) func() (core.Iterator, error) {
return func() (core.Iterator, error) {
events, err := m.globalEvents(start, count)
if err != nil {
return nil, err
}

// aggregateKey generate a aggregate key to store events against from aggregateType and aggregateID
func aggregateKey(aggregateType, aggregateID string) string {
return aggregateType + "_" + aggregateID
// no events to fetch
if len(events) == 0 {
return core.ZeroIterator{}, nil
}

// next time the function is called it will start from the last fetched event +1
start = events[len(events)-1].GlobalVersion + 1
return &iterator{events: events}, nil
}
}
Loading

0 comments on commit ac99b92

Please sign in to comment.